bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.65k stars 255 forks source link

feat: return token when request is made, resolve token when request handling completes #916

Open de-sh opened 1 month ago

de-sh commented 1 month ago

Solves #805

Type of change

New feature, that notifies the requester when their requests are acknowledged by the broker(in the case of QoS 1/2 Publishes and Subscribe/Unsubscribe) or when they are written to TCP buffer(for all other requests).

BREAKING: pending definition changes from VecDeque<Request> to VecDeque<(Request, Option<PromiseTx>)>.

Checklist:

coveralls commented 1 month ago

Pull Request Test Coverage Report for Build 11861258827

Details


Changes Missing Coverage Covered Lines Changed/Added Lines %
rumqttc/src/eventloop.rs 1 2 50.0%
rumqttc/src/v5/eventloop.rs 0 1 0.0%
rumqttc/src/tokens.rs 21 45 46.67%
rumqttc/src/state.rs 131 169 77.51%
rumqttc/src/v5/state.rs 111 180 61.67%
rumqttc/src/client.rs 25 140 17.86%
rumqttc/src/v5/client.rs 5 159 3.14%
<!-- Total: 294 696 42.24% -->
Files with Coverage Reduction New Missed Lines %
rumqttc/src/state.rs 1 83.68%
rumqttc/src/eventloop.rs 5 85.27%
rumqttd/src/link/timer.rs 8 0.0%
rumqttc/src/v5/state.rs 9 65.34%
rumqttc/src/v5/client.rs 15 12.79%
rumqttc/src/client.rs 15 32.37%
rumqttd/src/link/remote.rs 25 46.03%
rumqttc/src/v5/eventloop.rs 28 7.51%
rumqttd/src/link/bridge.rs 43 0.0%
<!-- Total: 149 -->
Totals Coverage Status
Change from base Build 10511632669: 1.5%
Covered Lines: 6346
Relevant Lines: 16864

💛 - Coveralls
xiaocq2001 commented 1 month ago

Glad to see the progress here. Some of the things to discuss:

  1. Is there any example on how the feature is used? I tried to use following code to test

    println!("--- Publishing messages and wait for ack ---");
    let mut set = JoinSet::new();
    
    let ack_promise = client
        .publish("hello/world", QoS::AtMostOnce, false, vec![1; 1])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });
    
    let ack_promise = client
        .publish("hello/world", QoS::AtLeastOnce, false, vec![1; 2])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });
    
    let ack_promise = client
        .publish("hello/world", QoS::ExactlyOnce, false, vec![1; 3])
        .await
        .unwrap();
    set.spawn(async move {
        ack_promise.await
    });
    
    while let Some(res) = set.join_next().await {
        println!("Acknoledged = {:?}", res?);
    }

    The output shows "RecvError"

    --- Publishing messages and wait for ack ---
    DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 0, Payload Size = 1
    Event = Outgoing(Publish(0))
    DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 4, Payload Size = 2
    Event = Outgoing(Publish(4))
    DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 5, Payload Size = 3
    Event = Outgoing(Publish(5))
    Acknoledged = Err(RecvError(()))
    Acknoledged = Err(RecvError(()))
    Acknoledged = Err(RecvError(()))
    Event = Incoming(Publish(Publish { dup: false, qos: AtMostOnce, retain: false, topic: b"hello/world", pkid: 0, payload: b"\x01", properties: None }))
    Event = Incoming(PubAck(PubAck { pkid: 4, reason: Success, properties: None }))
    Event = Incoming(PubRec(PubRec { pkid: 5, reason: Success, properties: None }))
    Event = Outgoing(PubRel(5))
    Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 1, payload: b"\x01\x01", properties: None }))
    Event = Outgoing(PubAck(1))
    Event = Incoming(Publish(Publish { dup: false, qos: ExactlyOnce, retain: false, topic: b"hello/world", pkid: 2, payload: b"\x01\x01\x01", properties: None }))
    Event = Outgoing(PubRec(2))
    Event = Incoming(PubComp(PubComp { pkid: 5, reason: Success, properties: None }))
    Event = Incoming(PubRel(PubRel { pkid: 2, reason: Success, properties: None }))
    Event = Outgoing(PubComp(2))

    In outgoing_publish, the tx is not saved to ack_waiter, it's dropped! Maybe you can add (with QoS0 notification in discuss 2)

        if publish.qos != QoS::AtMostOnce {
            self.ack_waiter[pkid as usize] = tx;
        } else {
            if let Some(tx) = tx {
                tx.resolve();
            }
        }

    after

        let event = Event::Outgoing(Outgoing::Publish(pkid));
        self.events.push_back(event);
  2. It seems in QoS0, there is no notification, is it worthy that we have notification on QoS0 packet sent (outgoing_publish).

de-sh commented 1 month ago

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?
xiaocq2001 commented 1 month ago

Thanks for the review, two more things I have been thinking about with respect to the interface are as follows:

  1. Returning pkid instead of ().
  2. Errors with more context about why a request was refused by the broker, both subscribe and unsub acks have reason codes as response. Should this just return the acknowledgement packets received from the broker or repackage the same into a more presentable type?

For 1, that's great. That also helps on 2, before we decide to put more things in error cases, the pkid helps to extract the error packet from event loop events. For 2, I vote the acknowledgement packets, since that's not only the reason code, but also with reason string and maybe other user defined properties.