tower-rs / tower

async fn(Request) -> Result<Response, Error>
https://docs.rs/tower
MIT License
3.56k stars 281 forks source link

MQTT client adapter / framework #744

Closed mbodmer closed 1 year ago

mbodmer commented 1 year ago

Hi, I am trying to use the rumqttc client from within tower and possibly create a sensible framework (pub/sub and req/rep) including a router for it. I am however new to tower and rust also, struggling in different corners, most notably on how to pass the client to the handlers in a transparent manner.

Basic Approach

I've started with Pub/Sub, modeling only incoming Publishs as the Request, no Response other than possible Errors.

I would like simple usage, something akin and hopefully also compatible to axum and the like, finally it should be possible to create a bridge (hybrid app?) between MQTT and eg. hyper, axum, tonic.

let svc = tower::service_fn(handler);

let mut mqttoptions = MqttOptions::new("rumqtt-async", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

rumqtt_tower::serve(mqttoptions.clone(), svc)?;

The serve function takes the middleware, creates the mqtt client and runs the eventloop. Incomnig Messages are forwarded to the middleware.

pub async fn serve<S>(mqttoptions: MqttOptions, mut service: S) -> Result<()>
where
    S: Service<Publish, Response = (), Error = Infallible> + Clone + Send + 'static,
    S::Future: Send,
{
    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);

    tokio::spawn(async move {
        loop {
            let notification = eventloop.poll().await;
            match notification {
                Ok(rumqttc::Event::Incoming(rumqttc::Incoming::Publish(msg))) => {
                    log::debug!("message: {:?}", msg);
                    let _response = service.call(msg).await;
                }
                Ok(rumqttc::Event::Incoming(rumqttc::Incoming::ConnAck(msg))) => {
                    log::debug!("connected to MQTT broker. {:?}", msg);
                }
                Ok(_) => {
                    log::debug!("event: {:?}", notification);
                }
                Err(err) => {
                    log::warn!("MQTT Error: {}", err);
                }
            }
        }
    });

    Ok(())
}

A clone of the client handle returned by AsyncClient::new() would be needed by the middlewares and handlers to publish and subscribe messages to mqtt.

How to pass the client handle?

My first struggle is how i would pass the client handle to a middleware, eg. the router?

Looking at axum I see it would be possible by passing it with a state object. With this approach I don't see a way to have this generically handled, and also I couldn't hide the mqtt client creation. Like this the client handle has to be known beforehand and put inside the State.

When I would pass the handle as an axum extractor I don't see how a middleware listening to eg. http events could access the client.

What are common patterns compatible with other frameworks in tower to handle situations like these?

How to extend for Req/Rep

MQTT5 also has support for Req/Rep. This is manly modeled with additional metadata on the messages. How can these different patterns be modeled side to side? Any examples?


Sorry for my somewhat diffuse questions, if you need more details i would try to describe it more precise. I am thankful for any hints and pointers. Also tell me if I am trying to use the wrong tool even when I think it would fit in pretty well, but don't really see it yet :-)

Thanks and regards, Marc

mbodmer commented 1 year ago

I am going to close this issue, as no answers came in. There was an interesting discussion in discord about state, extractors and the like.

Looks like how axum handles state is the way to go for now. It would make sense imo. to include this in tower so it can be shared across middlewares.

This however, as I understand it, would be a breaking change as it would mean another parameter for a generic state in Service::call().

Almost any application applying tower has to handle state somehow.