eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
516 stars 102 forks source link

Question/suggestion on how to get immidiate feedback from a publish call #101

Closed svanharmelen closed 3 years ago

svanharmelen commented 3 years ago

I have a question about the way the publish call works. If I understand the code correctly, it looks like the async publish method makes a call to publish the message and then returns a future that can be polled until the on_failure or on_success callback is called.

The problem with this is that it can take very long. Say that my connection is lost and I have configured the client to auto-reconnect and to persist messages. While the C call to publish a message will place the message on the queue and returns immediately, the Rust call returns a future that I have to drive to completion in order to find out if it failed or succeeded.

It would be nice if we could mimic the C behavior and give callers a choice about what to do once a message is added to the queue. In my case I'm good when I know the message is in the queue and will now be handled by the persistence logic of the C package for handling it's delivery.

So I thought that maybe changing the method to something like this would be nice improvement:

pub fn publish(&self, msg: Message) -> Result<DeliveryToken>

This way you can make a difference between the "internal" publish call (put the message on the queue) and the "external" publish call (the actual sending of the message) and let the caller use what is applicable. One could only check the Result, of first check the Result and then .await on the token to know if the message was successfully send or not.

Curious if you would be open to a PR implementing something like this?

svanharmelen commented 3 years ago

Seems super trivial, but maybe I overlooked anything... Again curious what you think of it.

fpagliughi commented 3 years ago

As I mentioned in the PR #102 I'm leaning against this solution because:

  1. It forces two checks for errors
  2. It breaks with similar Paho libraries in other languages.

But I also wonder if the idea doesn't move us away from the evolution of this library towards the async/await paradigm. Having the functions return a Result wrapped in a Future walks us toward eventually making the functions like publish() become async in some future version of the library without breaking much of the existing code that uses the library with await.

Wouldn't wrapping the Future in a Result break the pattern?

svanharmelen commented 3 years ago

I get your point, but let's first see if we are aligned on the problem... Maybe you have a suggestion or insight on how to solve it differently, but I had issues with the following:

I have a question about the way the publish call works. If I understand the code correctly, 
it looks like the async publish method makes a call to publish the message and then returns 
a future that can be polled until the on_failure or on_success callback is called.

The problem with this is that it can take very long. Say that my connection is lost and I have 
configured the client to auto-reconnect and to persist messages. While the C call to publish a
message will place the message on the queue and return immediately, the Rust call returns a
future that I have to drive to completion in order to find out if it failed or succeeded.

So that is until the internet connection is restored and the queue is emptied up until the 
message I'm trying to send...

Currently we have a bunch of producers all sending their messages over a single MPSC channel. The consumer is running in its own Tokio task, iterates over all incoming messages and then batches and publishes them. So when there is high traffic coming through the channel I want to iterate quickly and not wait for the previous message to actually be send (as it can be in the queue for quite some time if there is a bad internet uplink).

So either I "trust" the C client logic to persist and eventually send the message (like I'm doing now) or I have to spawn a new dedicated Tokio task for each batch of messages I'm trying to send in order to prevent blocking the channel. But, again, during high load and a moment were the internet connection is pore or fails, this could mean we have a lot of tasks hanging around doing nothing, other then waiting for the call to return. And once it returns we do noting with the result, other then write a log message if the delivery failed.

Codewise it probably means that I have to wrap the client in an Arc and clone it for each task... Next to managing all the tasks, that feel much more complicated and fragile then the solution in this PR. Just hand over the message to the queue and trust the persistence and queue logic for the actual delivery...

fpagliughi commented 3 years ago

Well, first, the client is just an Arc that can be cheaply cloned to share among publisher threads...

https://github.com/eclipse/paho.mqtt.rust/blob/976e7ea538d75b960731183515207ee40dd6a153/src/async_client.rs#L92-L97

svanharmelen commented 3 years ago

Good point 😏

Yet why should I have to continiously start and manage tasks, while I can also manage everything from just a single long running task instead?

Additionally it somehow feels like a second layer of code is trying to manage the same thing. That being delivering the message.

The C code already has a ton of stuff for monitoring and managing the actual delivery, so it feels like I should be able to handoff my message and just trust and build on that existing logic.

All in all I just don’t see any benefits, but only downsides. More tasks, more code paths, more things to manage and to test.

So only the fact that the client is cheap to clone, is (IMO) not enought reason to not use an approach as suggested (or at least simular in how it can be used) in this PR.

fpagliughi commented 3 years ago

I guess, though, what I'm not seeing is the time delay you mention. On the failure of the C send function, the Token completes immediately with a failure. So on return of the call publish(), the future is ready. It doesn't wait for a callback to know that.

I guess you're saying that the API doesn't present a non-blocking way to know that?

It's a little unfortunate that the Future works this way, but in reality, these aren't traditional Futures. The C lib is the real executor, and the task starts when you call the publish() function. When you "drive the future to completion" it really just returns immediately if C call failed or already completed (as with QoS=0), or it blocks the calling task until the C lib completes on an ACK.

Before they became Future's, the Tokens were just managed by a Mutex and Condition Variable, so you could check and re-check them as often as you like without consuming them.

fpagliughi commented 3 years ago

To your point, above, I believe you're correct that I'm not aligned on the problem yet. Apologies.

The problem with this is that it can take very long.

That's not true if the C send function failed with an error (non-zero) return code. When the token is created with the error code, it is marked "complete" on construction. So await'ing it would only take slightly longer (for an uncontested Mutex lock) than to check the Result code.

fpagliughi commented 3 years ago

Or, maybe, to put the question another way... If the API will evolve into a standard async/await and we expect something like:

impl AsyncClient {
    pub async fn connect(&self, opts: ConnectOptions) -> Result<ServerResponse> { ... }
    pub async fn disconnect(&self, opts: DisconnectOptions) -> Result<()> { ... }
    pub async fn publish(&self, msg: Message) -> Result<()> { ... }
    // ...
}

How would we handle this issue?

svanharmelen commented 3 years ago

I guess you're saying that the API doesn't present a non-blocking way to know that?

Yeah I think so...

It's a little unfortunate that the Future works this way, but in reality, these aren't traditional Futures

I know... I went through all the code (also the C code)

That's not true if the C send function failed with an error (non-zero) return code. When the token is created with the error code, it is marked "complete" on construction. So await'ing it would only take slightly longer (for an uncontested Mutex lock) than to check the Result code.

Yes, but I am "required" to call .await in order to find out. And once I do, I'll be stuck waiting until the message is actually delivered (assuming the queuing was successful). In that way I almost see this as two separate logical operations:

  1. Queue a message
  2. Deliver the message

I just want to know if it's properly queued. After that I'll leave it to the C client. But even for people who want to wait on the delivery this two step API seems pretty reasonable. Get an Ok(token) back, you know it's queued successfully. Want to know when it's delivered? Call token.await after that.

Of course you could use things like tokio::select with a second branch configured with a timeout, or tokio::time::timeout or something along those lines, but to be honest that all feels a bit dirty and error prone to me. I much prefer a clear result instead. Also, for people not so deep in all the inner workings of this package, it would be hard to explain that you could use the returned token like that if you only want to know if its queued like me.

svanharmelen commented 3 years ago

In that case I would almost make an additional fn:

impl AsyncClient {
    pub async fn connect(&self, opts: ConnectOptions) -> Result<ServerResponse> { ... }
    pub async fn disconnect(&self, opts: DisconnectOptions) -> Result<()> { ... }
    pub async fn queue(&self, msg: Message) -> Result<()> { ... }
    pub async fn publish(&self, msg: Message) -> Result<()> { ... }
    // ...
}

EDIT: Where publish does both the queue and publish action of course!

But I get your point... If you call pub async fn publish(&self, msg: Message) -> Result<()> { ... } you probably expect the result to be the result of the actual publish, not of the queue action.

svanharmelen commented 3 years ago

The reason why I'm even looking at this is, that one of the errors (which is now fixed) in the C client caused some messages to not be handled properly and then they got "lost" causing the task to never unblock.

I then went in to the code to better understand the logic and to me the problem is the fact that the queuing mechanism (incl. the persistent logic) is in between my publish call and the actual publish.

fpagliughi commented 3 years ago

Well, yeah, I guess the internal implementation needs to work as the first step!!!

Which brings up another point that the proposed solutions - returning Result<> or the async queue() function - are exposing an internal implementation detail in the public API.

And that detail will most definitely change, though perhaps off in the future.

This API will likely evolve into the async functions before too long. That shouldn't change the applications using async/await because the existing API is mostly just the de-sugared version of them; at least to the outside world. At some time in the more distant future, this library will likely become a 100% Rust async/await implementation, and it would be nice to do that without affecting the API too much.

I guess I'm trying to keep the bigger picture in mind. Things like I mentioned: consistency in the API itself (all the other functions return a token), some consistency between the Paho libraries, and worrying about making this particular use case easier while making many other use cases more difficult by requiring two error checks for every publish.

svanharmelen commented 3 years ago

returning Result<> or the async queue() function - are exposing an internal implementation detail in the public API.

I'm not sure if I agree with that statement. When creating a client you already have to configure if and how much messages your want to hold in the queue (max_buffered_messages) and if you want to persist those messages. So the notion of a queue is already exposed through the existing configuration options.

worrying about making this particular use case easier while making many other use cases more difficult by requiring two error checks for every publish.

That would not be the case with a dedicated queue method right? As a user you can then choose which method to use and in both cases you just get one result. The existing publish method should not change and people should use either one or the other to hand off a message.

When thinking a bit more about it, I really think that an additional method to only queue a message would be really great. Optionally the queue method could maybe take callbacks which could then be used as custom on_failure and on_success callbacks for when people want to log the final result whenever the message is delivered without having to wait for that themselves. Now those callbacks (in the existing publish method) are used to update and end the token future, but they could be used differently with a queue method right?

I really believe adding such a method adds value and improves the way this package can be used, without making it harder to update the publish method in the future (or any async/await logic in general). It just allows for another way to handle messages which in some cases can add value (as it does in our setup).

fpagliughi commented 3 years ago

you already have to configure if and how much messages your want to hold in the queue (max_buffered_messages)

Yes, but the intent there was for off-line buffering: meaning how many messages should it hold when the client it not connected. Perhaps a pure Rust async implementation would implement separate tasks to serialize outbound packets and transmit them - with a queue between them. But again, that would be an internal implementation detail. Maybe it would serialize and write_all() in one task with no queue, unless off-line bufering.

But, anyway, how would a queue() function work? It would return a future that is already completed when returned? That's probably not right. Plus you would lose track of the message and never know if it completed transmission. That's sort of the purpose of QoS=0, although in this case, the internal library would still be trying to deliver the message, even if you didn't know that.

Maybe the queue() function wouldn't be async, but what you originally proposed: A Result in a Future in a Result. Something like:

impl AsyncClient {
    pub async fn connect(&self, opts: ConnectOptions) -> Result<ServerResponse> { ... }
    pub async fn publish(&self, msg: Message) -> Result<()> { ... }

    pub fn queue(&self, msg: Message) -> Result<impl Future<Output=Result<()>> { ... }
    // ...
}

Of course, if we do this, then someone will also want a send() function that returns a future that completes when the packet leaves the queue and is put on the wire!

pub fn send(&self, msg: Message) -> Result<impl Future<Output=impl Future<Output=Result<()>>> { ... }

This is an interesting problem. I wonder what other asynchronous libraries do in this situation. This is probably a good question for the Rust forum for some ideas.

svanharmelen commented 3 years ago

It would be great if we could just discuss this face to face, as it takes so much effort like this πŸ˜‰

I would expect that the queue function doesn't return a future, just a result. What purpose would it have if the result also contained a future? In that case you should just use the existing publish function. Same for the send function you described. That essentially is what the publish function already gives your right?

So what is wrong with something like this:

pub fn queue(&self, msg: Message) -> Result<()> { ... }

Of maybe even something like this:

pub fn queue<F>(&self, msg: Message, on_failure_cb: F, on_success_cb: F)
    where F: FnMut(&AsyncClient) + 'static
-> Result<()> { ... }

As that way you can still get notified when the message is actually delivered and log something if you want.

I don't see why offering two different ways to handle sending a message would be an issue? It just enables more usecases IMO.

fpagliughi commented 3 years ago

A Future in a Result is what you proposed in your PR #102 ! :smile:

pub fn queue(&self, msg: Message) -> Result<impl Future<Output=Result<()>> { ... }

is effectively the same as

pub fn publish(&self, msg: Message) -> Result<DeliveryToken> { ... }

as the DeliveryToken implements a Future. This is back to your original idea that you could detect if the queuing failed immediately, and if not, get a Future to track when the publish completed (was PUBACK'ed).

svanharmelen commented 3 years ago

A Future in a Result is what you proposed in your PR #102 ! πŸ˜„

Yes, but since you don't like the approach in PR #102 I am now suggesting another solution (adding a queue function).

I don't understand how you now come up with this:

pub fn queue(&self, msg: Message) -> Result<impl Future<Output=Result<()>> { ... }

That is totally not what I just suggested right?

So what is wrong with something like this:

pub fn queue(&self, msg: Message) -> Result<()> { ... }

Of maybe even something like this:

pub fn queue<F>(&self, msg: Message, on_failure_cb: F, on_success_cb: F)
where F: FnMut(&AsyncClient) + 'static
-> Result<()> { ... }
fpagliughi commented 3 years ago

I posted the (simplified) question here: https://users.rust-lang.org/t/tracking-the-state-of-an-async-operation/51726

svanharmelen commented 3 years ago

Funny... So the first two options mentioned are actually my first (PR #102) and second (add a queue function with callbacks) suggestions πŸ˜‚

It does however surprise me that you respond with That last one is an interesting; hadn't thought of that on the third option, as that again requires people to check two results instead of one (which was one of the reasons you didn't like the solution in PR #102).

After all these discussions, I very much believe adding a queue function like I suggested (the one with the callbacks) is actually the best solution. It allows a sync response to a sync action and still a way to track the end result that doesn't require you to add a lot of additional logic.

svanharmelen commented 3 years ago

But... Happy to see several suggestions/options, as that most likely means we will eventually be able to add a solution for this one πŸ˜„

fpagliughi commented 3 years ago

No, you don't need to check twice on that third option. That's why it's interesting.

The Token itself completes when the message is delivered - meaning there is no change there from the existing API. Nothing breaks:

cli.publish(msg).await?;

Same as before.

The calls to tok.queue() and/or tok.write() are totally optional, and would only need to be called by anyone interested in those phases of the state of the operation. Not to mention that it tracks the state of the operation via the Token, which is the purpose (and was the original use) of the Token object.

It would complicate the DeliveryToken quite a bit, but not spill into the Client or the applications if they don't want it.

But it also provides a means to eventually cover the additional state (written) at some point in the future. (Believe me - someone will ask for that next.) And it provides an easy path for tracking the state of other operations like connect, disconnect, subscribe, etc.

Keep in mind that with 3yrs of this library and 8yrs of the similar C++ library API, and half a million combined downloads... you're the only person that ever asked for this feature. :smiley:

And it was based on a bug, that was fixed. And some assumed time delays that don't exist. So I'm not sure how universally useful or popular this might be.

fpagliughi commented 3 years ago

But, honestly, I think using callbacks is my least favorite of all. It would totally break the flow of an otherwise simple async client. For that matter, if async/await, or even Futures had been around when I started the library, I probably never would have implemented callbacks - at least not for operation success/failure.

fpagliughi commented 3 years ago

Damn. Actually, if we're just adding functions to the Tokens, we could add anything, right? It doesn't need to be something that has to return a future. The "queued" success/error will always have been completed or failed by the return of publish(), so it could just have query state functions. .is_complete(), .is_err(), .state(), or something like that.

svanharmelen commented 3 years ago

Ah I see... You added an example in your reply that indeed shows a solution that doesn't require calling the intermediate steps. Interesting...

It would totally break the flow of an otherwise simple async client

How would it break the flow? Its an additional method that you can optionally use. But if you don't care you can just ignore it and keep using the existing methods. In that case nothing will change for you right?

Keep in mind that with 3yrs of this library and 8yrs of the similar C++ library API, and half a million combined downloads... you're the only person that ever asked for this feature. πŸ˜ƒ

Don't see how that is relevant...

And some assumed time delays that don't exist

The time delays I'm talking about a very much there and do exist. How is it not true that a message can be in the C clients' queue for a couple of minutes (maybe even longer)? As that is the "delay" I'm talking about...

Seems like "talking" through typing and having different native languages is difficult... Who could have known πŸ˜‚

svanharmelen commented 3 years ago

Damn. Actually, if we're just adding functions to the Tokens, we could add anything, right? It doesn't need to be something that has to return a future.

I agree that this is indeed a super interesting direction to think about... But I think it returning a future does add more value as then you can .await until that step is completed instead of having for add a while loop or something like that.

Also not sure what is_completed() or is_queued() should return if not a future? Just a bool? But then what if the queue failed? What calls should I make to get to know that and get the error that happened?

svanharmelen commented 3 years ago

I actually already thought about using now_or_never (as I already use it elsewhere in our code), but just liked to have a more concrete result like I initially proposed or later suggested by means of the queue function with callbacks

But I understand that will not happen in a simple way like suggested in #102 so I'll either keep adding that solution to my own fork, or I'll try to use now_or_never after all. Thanks for the time and discussion.

fpagliughi commented 3 years ago

Yeah, I suppose I have a pre-existing idea of how the tokens should work, from using the libraries for so long. The C++ lib has

void wait()
bool try_wait()
bool wait_for(duration)
bool wait_until(time_point)

So you have a lot of options to check if the result is there without committing to wait forever, or consuming the token. (And you can call them from different threads, repeatedly without fear).

These options kinda disappeared from the Rust lib when the Tokens were converted to implement Futures.

For the non-Future status functions, I was meaning for this specific usage: Where the Future might already be complete upon returning from publish() and/or a non-blocking check before doing something else. Not meant to be used in a polling loop.

And I thought the callbacks for the queued state would be really weird because it would only ever be called from inside publish()... before it returned, not like an asyc callback that would happen some time later. Right?

fpagliughi commented 3 years ago

So, to come back around. This is what I had been thinking we needed from the beginning. The Token is the result. We just need a way to get the current status from it without committing to block for days at a time, and without consuming it if it's not complete.

Maybe to keep compatibility, it can be implemented in a try_wait() -> Result<()> for folks not using await. (i.e. everyone using a released version of the library!)

fpagliughi commented 3 years ago

Oh, and to keep the full history here, the two suggestions from the Rust Forum were:

From Kornel:

Returning result-wrapped future makes a lot of sense for things that may fail synchronously.

Another option is to taking some form of callback or channel as an argument that will be notified about progress to each stage.

Or you could return an object that requires user to call and await each stage:

e.queue().await?;
e.write().await?;
e.finish().await?;

From Alice:

You can use poll_fn.

let res = poll_fn(|cx| Poll::Ready(future.poll(cx))).await;

or now_or_never 2 (on a mutable reference to not consume it)

use futures::future::FutureExt;
let res = (&mut future).now_or_never();

in either case, the future object will still exist even if it completes, by trying to await it afterwards will fail if it completed on your one poll.

fpagliughi commented 3 years ago

I will reopen this to remind me to actually implement it for the next release! And we can discuss, because I may try a few of these different ideas to see what works best.

fpagliughi commented 3 years ago

OK. I added try_publish() functions to the AsyncClient and Topic objects, like:

pub fn try_publish(&self, msg: Message) -> Result<DeliveryToken> { ... }

This is similar to what was proposed in PR #102, but it keeps the the old publish() in its original form, and also adds a new Error type:

Error::Publish(rc: i32, msg: Message)

so, if try_publish() fails, you can recover the message and try sending it again. This actually didn't take much code at all as one can be defined in terms of the other: i.e. publish() just calls try_publish().

Now the question is whether, for consistency, there should be try_connect(), try_subscribe(), try_unsubscribe(), etc.

Next I will attempt to implement a try_wait() for the Token, as discussed above.

fpagliughi commented 3 years ago

OK. The Token::try_wait() is in as well. (This is currently all in the develop branch).

fpagliughi commented 3 years ago

OK, I pushed out both of these new features in a v0.9.1 release. To review, you can get the result of a publish in two ways without having to block and wait for the operation to fully complete.

With AsyncClient or Topic functions for try_publish():

match cli.try_publish(msg) {
    // If OK, then queuing completed and we can now wait on the token.
    Ok(tok) => if let Err(err) = tok.wait() {
        eprintln!("Error sending message: {}", err);
    },
    // An error queuing the message
    Err(mqtt::Error::Publish(rc, msg)) => {
        eprintln!("Error creating/queuing the message: {}", rc);
        // Now we can do something with `msg`
    },
    Err(err) => // This should never happen
        panic!("Unexpected error"),
}

or with Token::try_wait():

let mut tok = cli.publish(msg);

match tok.try_wait() {
    None => // Queuing succeeded, operation in progress
    Some(Ok(rsp)) => // Queuing succeeded, already operation completed with server response `rsp`
    Some(Err(err)) => // Queuing or operation failed with error `err`
}

I'm curious if people would have a preference for one way or the other. But they each took so little code to implement I have no problem keeping them both indefinitely.

svanharmelen commented 3 years ago

Thanks @fpagliughi, really nice to have this in!

I'll be off for another week, but will certainly test and play with all the new stuff soon after πŸ‘πŸ»

svanharmelen commented 3 years ago

Just as a heads up, some personal things came up so I didn't got to work on this yet. But it's still high on my list, so will pick it up shortly...

svanharmelen commented 3 years ago

I had some time to play and test with this, and the new features added in v0.9.1 work like a charm πŸ‘πŸ» Thanks!