eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
525 stars 102 forks source link

AsyncClient::get_stream() support unbounded channel #191

Closed fbrouille closed 1 year ago

fbrouille commented 1 year ago

Sometimes, a MQTT broker provides a lot of topics with a retain message for each of them. Within current implementation, AsyncClient relies on a bounded channel that may be fully filled very quickly, even if messages are consumed and processed very fast. Fully filled the bounded channel results in losing messages. Note that I tried to determinate the relevant channel buffer capacity, but this might be difficult. This PR aims to fix losing messages by using an unbounded channel if buffer_sz is 0. This way, the underlying channel will grow up automatically to buffer all received messages. Of course, this is not the ideal solution for all use cases because of the memory footprint. I think that a MQTT broker usually does not provide so many topics with retain messages. So generally setting buffer_sz to a positive number is the way to go.

fpagliughi commented 1 year ago

A few people have asked about this kind of thing since the original sync consumer. I'll merge this with the next point release, but may reconsider the API... having a special integer value for "no limit" seems like such a C thing to do :-)

Maybe an Into<Option<usize>>? Or am I getting complicated?

But also, we should be consistent with start_consuming() although these are backwards, I guess.

fbrouille commented 1 year ago

IMO Option<usize> makes sense: None for an unbounded channel and Some(capacity) for a bounded channel. I have also added some comments on the parameter buffer_sz.

fpagliughi commented 1 year ago

Yeah, nice. Thanks for updating it. Maybe I'll rename the parameter buffer_limit or something like that, then the option will make more sense... A "limit" of "None" logically means unbounded.

But I think I will take it a step further and make it into an Into<Option<usize>>. That way it won't break backward compatibility with the existing code. You can say:

cli.get_stream(32);

which would convert to cli.get_stream(Some(32)) automatically.

Thanks for the PR.

fpagliughi commented 1 year ago

To be clear... The PR, as submitted, broke the build of the examples:

error[E0308]: mismatched types
    --> examples/async_subscribe.rs:69:39
     |
69   |         let mut strm = cli.get_stream(25);
     |                            ---------- ^^ expected enum `Option`, found integer
     |                            |
     |                            arguments to this method are incorrect
     |
     = note: expected enum `Option<usize>`
                found type `{integer}`
note: associated function defined here
    --> /home/fmp/mqtt/paho.mqtt.rust/src/async_client.rs:1119:12
     |
1119 |     pub fn get_stream(&mut self, buffer_sz: Option<usize>) -> AsyncReceiver<Option<Message>> {
     |            ^^^^^^^^^^
help: try wrapping the expression in `Some`
     |
69   |         let mut strm = cli.get_stream(Some(25));
     |                                       +++++  +

but changing it to an Into<Option<usize>> gets the functionality you want without breaking backward compatibility.

pub fn get_stream<L>(&mut self, buffer_lim: L) -> AsyncReceiver<Option<Message>>
where
    L: Into<Option<usize>>,
{
    let (tx, rx) = match buffer_lim.into() {
        None => async_channel::unbounded(),
        Some(lim) => async_channel::bounded(lim),
    };

    ...
}

Plus it's nicer to be able to call the function for a bounded channel without needing the Some().

fbrouille commented 1 year ago

thank you for the fix

fpagliughi commented 1 year ago

Thanks again for the PR. I will update the synchronous channel version to do the same thing when I do the next major update, when I can make some breaking changes (v0.13)