bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.66k stars 255 forks source link

rumqttc - Get `pkid` on `AyncClient::subscribe` (v5) #605

Open sgasse opened 1 year ago

sgasse commented 1 year ago

We want to know which subscriptions failed when receiving a Event::Incoming(Packet::SubAck). The SUBACK that answers on a SUB has the same packet identifier (pkid) according to the MQTT standard. However currently, we do not get the pkid of a Subscribe which is created by a call to AsyncClient::subscribe.

This is related to the open issue #349 which tracks the same problem for AsyncClient::publish. Apparently, there was a PR merged at some point addressing this (#351) but the method signatures have changed again since then.

Is it planned to have pkids of Subscribe and Publish returned on AsyncClient::subscribe, AsyncClient::subscribe_many etc. returned at some point?

sgasse commented 1 year ago

@svet-b suggested a workaround for the Publish case here, namely inspecting the Event::Outgoing events on the EventLoop.

As far as I can tell, calls to AsyncClient::subscribe will place Requests on an internal flume channel. Can we assume that the order of Event::Outgoing(Outgoing::Subscribe(pkid)) will match the order in which we called AsyncClient::subscribe?

A quick example (stripped to the basics) with book-keeping is shown below. Do you think this works correctly?

use std::collections::{HashMap, VecDeque};

use rumqttc::{
    v5::{
        mqttbytes::{v5::Packet, Filter, QoS, SubscribeReasonCode},
        AsyncClient, Event, MqttOptions,
    },
    Outgoing,
};
use tokio::sync::mpsc;

pub async fn run(mut topics_rx: mpsc::Receiver<Vec<String>>) {
    let url = "mqtt://localhost:1234?client_id=simon";
    let url = url::Url::parse(url).unwrap();
    let mqttoptions = MqttOptions::try_from(url).unwrap();

    let (client, mut eventloop) = AsyncClient::new(mqttoptions.clone(), 10);

    let mut pkid_topics: HashMap<u16, Vec<String>> = HashMap::new();
    let mut inflight_subscriptions: VecDeque<Vec<String>> = VecDeque::new();

    loop {
        tokio::select! {
            event = eventloop.poll() => {
                match event {
                    Ok(event) => match event {
                        // Catch outgoing subscriptions for book-keeping.
                        Event::Outgoing(Outgoing::Subscribe(pkid)) => {
                            match inflight_subscriptions.pop_front() {
                                None => println!("Error getting matching inflight topics"),
                                Some(topics) => {
                                    pkid_topics.insert(pkid, topics);
                                }
                            }
                        },
                        Event::Incoming(event) => match *event {
                            // Catch authorization issues.
                            Packet::SubAck(sub_ack, _props) => {
                                // Try to find the topics by pkid
                                match pkid_topics.get(&sub_ack.pkid) {
                                    Some(topics) => {
                                        debug_assert_eq!(sub_ack.return_codes.len(), topics.len());
                                        for (return_code, topic) in sub_ack.return_codes.iter().zip(topics.iter()) {
                                            match return_code {
                                                SubscribeReasonCode::Success(_) | SubscribeReasonCode::QoS0 | SubscribeReasonCode::QoS1 | SubscribeReasonCode::QoS2 => {
                                                    println!("Successfully subscribed to {}.", topic);
                                                }
                                                SubscribeReasonCode::NotAuthorized => {
                                                    println!("Not authorized to subscribe to topic {}.", topic);
                                                },
                                                err => {
                                                    println!("Error in subscribing to topic {}: {:?}.", topic, err);
                                                }
                                            }
                                        }
                                    }
                                    None => {
                                        println!("Could not find topics for pkid {}", sub_ack.pkid);
                                        if !sub_ack.return_codes.iter().all(|r| match r {
                                            SubscribeReasonCode::Success(_) | SubscribeReasonCode::QoS0 | SubscribeReasonCode::QoS1 | SubscribeReasonCode::QoS2 => {
                                                true
                                            }
                                            _ => false
                                        } ) {
                                            println!("Error subscribing to some unknown topics");
                                        }
                                    }
                                }
                            }
                            _ => (),
                        }
                        _ => (),
                    }
                    Err(e) => {
                        println!("MQTT client error: {e:?}");
                    }
                }
            },
            topics = topics_rx.recv() => match topics {
                None => break,
                Some(topics) => {
                    if let Ok(_) = client.subscribe_many(topics.iter().map(|t| Filter::new(t, QoS::AtLeastOnce))).await {
                        inflight_subscriptions.push_back(topics);
                    }
                }
            },
            else => break,
        }
    }
}