inre / rust-mq

RustMQ is the MQTT client written on pure Rust.
MIT License
190 stars 28 forks source link

Using a single client to subscibe and publish at the same time #6

Open tekjar opened 8 years ago

tekjar commented 8 years ago

Is it possible to use same client to subscribe (and read incoming messages) and publish at the same time?

loop {
        match client.await().unwrap() {
            Some(message) => println!("{:?}", message),
            None => {
                println!(".");
            }
        }
    }

Since this is blocking, where do I do my client.publish() ? Can you please write an example where a client does both subscribe and publish?

inre commented 8 years ago

I have a plan to do this. There is an example from future version of client:

...
let mut (sender, client) = opts.async_connect(address.as_str(), netopt).unwrap();

thread::spawn(move || {
     match listener.await().unwrap() {
            Some(message) => println!("{:?}", message),
            None => {
                println!(".");
            }
        }
     }
});

let sender2 = sender.clone();
thread::spawn(move || {
   sender2.publish("topic2", "value2", PubOpt::at_most_once());
});

loop {
   sender.publish("topic", "value", PubOpt::at_most_once());
   //... sleep ...
}

I suppose there is no need Mutex required.

tekjar commented 8 years ago

That's awesome. Thanks :)

tekjar commented 8 years ago

Just a heads up. I just wanted to try what you were discussing and segregated the code and added necessary locks. (Lot of clean up and testing pending)

https://github.com/kteza1/rust-mq/blob/async/mqttc/src/client.rs

Example https://github.com/kteza1/rust-mq/blob/async/mqttc/examples/async.rs

To be fixed: https://github.com/kteza1/rust-mq/blob/async/mqttc/src/client.rs#L391 https://github.com/kteza1/rust-mq/blob/async/mqttc/src/client.rs#L479

inre commented 8 years ago

Wow! Lovely! it's almost something that I want! only one thing... I want to stay the way to work without mutex at all. So it needs enum or something, maybe inside Connection for instance.

tekjar commented 8 years ago

I know I've shared some unnecessary stuff (like incoming_*) here(I'll move these out) but queues like outgoing_ack, outgoing_rec should be shared between sender and listener threads. Right now, locks are held only for this short duration while updating these queues. I'm not entirely sure how to remove locks here. If you could guide me, I'll do it :).

On a different note, I don't know how to push these upstream, not for immediate merge but to keep track of these changes and review them.

inre commented 8 years ago

I can't explane this for now. I see some disadvantages in my mqttc crate and I want to fix it before I'll start an async support. You can use your implementation some time.

I suppose I'll do separate classes Client and (AsyncClient, Publisher). Client has trait (Mqttc rename to PubSub) and MqttAwait AsyncClient has trait MqttAwait Publisher has a trait PubSub

francozappa commented 7 years ago

Hi @inre and @tekjar ,

Any updates about the subscribe and publish feature?

Thanks.

tekjar commented 7 years ago

@francozappa I'm not sure about this feature in this crate but I ended up writing another one with a different architecture

https://github.com/Ather-Energy/RuMqtt