AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 71 forks source link

Cloning channels in public interface. #172

Open leoniloris opened 5 years ago

leoniloris commented 5 years ago

Closes #171

leoniloris commented 5 years ago

Hello @tekjar. Could you please take a look into this?

tekjar commented 5 years ago

@leoniloris I'll check this today. Wish github had some sort of reminder option

tekjar commented 5 years ago

@leoniloris @TheBestJohn I'm not sure if cloning the channel everytime you do a publish is the right way to go here because I'm not sure of the costs involved (Remember channel implementation doubling the capacity for every clone).

I want to understand how "usefulness of channels" and removing &muts are related. What wrong with cloning the client itself?

leoniloris commented 5 years ago

Thanks for the answer @tekjar !

Do you have a suggestion?

TheBestJohn commented 5 years ago

Adding my use case here, I've been playing with connecting this library to the Rocket web framework. I'm receiving a POST request, doing some auth and validation, then sending an MQTT message. Rocket can manage the state of anything with the <Send> and <Sync> traits. However, needing to be borrowed mutably would force me to put this entire struct into a Mutex if I wanted my requests to be properly async leaving me with State<Mutex<MqttClient>>. This change makes using it with the library much more straightforward but I do admit I hadn't looked at any of the memory consumption factors when I did my testing, just performance.

as a note, if you want to do this you will have to excise the ALPN functionality as Rocket uses an older version of Ring and thus older rustls, and thus has a different type for the ALPN configurations &[String] instead of &[Vec<u8>]. I'm trying to figure out a fix for this but in the meantime I've just removed it from the lib locally

TheBestJohn commented 5 years ago

Also according to the documentation on crossbeam here.

Note that cloning only creates a new handle to the same sending or receiving side. It does not create a separate stream of messages in any way

and shortly thereafter they say that

It's also possible to share senders and receivers by reference

This seems to suggest that the two methods to do this are about the same.

tekjar commented 5 years ago

@TheBestJohn crossbeam channels are convenient. We don't have to use &mut to send data over crossbeam channel. But we don't use crossbeam channel to handle user requests as it doesn't support futures Stream.

The problem with futures channel is capacity doubling for every temporary clone. I don't think this is the correct way to go here. I think @stjepang is working on crossbeam channel for async-std. We'll have to wait and see

Rocket can manage the state of anything with the <Send> and <Sync> traits. However, needing to be borrowed mutably would force me to put this entire struct into a Mutex if I wanted my requests to be properly async leaving me with State<Mutex<MqttClient>>

Just to make sure that I understand this completely, You have a struct which has both rocket handler and MqttClient where rocket handler is send + sync while MqttClient is not causing the inconvenience?

TheBestJohn commented 5 years ago

Oh it's not using crossbeam there. Sorry, my mistake.

edit: Ah and looking into this I can see you here: https://github.com/crossbeam-rs/crossbeam-channel/issues/22

I've gone down the rabbit-hole and ended up here: https://stjepang.github.io/2019/03/02/new-channels.html Pretty interesting read and I'm starting to get what the problem is.

Just to make sure that I understand this completely, You have a struct which has both rocket handler and MqttClient where rocket handler is send + sync while MqttClient is not causing the inconvenience?

Sort of. State<T> allows for Rocket to take ownership of any type that implements Send and Sync. While MqttClient itself is thread safe, it's publish method is inherently racy as you can't have two threads trying to get a mutable ref to it at the same time. The only other way to fix this, as far as I understand, would be to wrap the client in a mutex or RWLock and lock it whenever I want to add something to the send queue. With that comes a whole bunch of error handling and managing unsent publishes for re-try which at that point it's almost better to also wrap it in an Arc and put the whole interaction into another thread or some other form that would essentially work out to this functionality.

It just seems the nature of this lib being focused on async with the server, lends itself to the idea that the library should be able to accommodate an application or library's interaction with rumqtt in an async manner as well. Maybe that's naïve and asking too much? I don't know, and please don't take it as a criticism of your work. I don't fully understand the library but hope to know more.

There's a thread on mutable refs in State here

I can provide a MWE for demo purposes as well if you'd like to take a look.

tekjar commented 5 years ago

Yeah a minimal example will help

TheBestJohn commented 5 years ago

Here you are. It uses my fork of rumqtt because of issues with rustls versions. Swap it back to head if you'd like to see those too

https://github.com/TheBestJohn/mwe_rocket_rumqtt

TheBestJohn commented 5 years ago

Updated the MWE to use Rocket head directly with this branch. Rocket updated it's TLS stuff so the ALPN issues no longer arise.