bytebeamio / rumqtt

The MQTT ecosystem in rust
Apache License 2.0
1.53k stars 234 forks source link

Connect with session_expiry_interval for MQTT v5 #853

Open xiaocq2001 opened 2 months ago

xiaocq2001 commented 2 months ago

Expected Behavior

When connecting to broker, session_expiry_interval should be supported.

Current Behavior

There is no API to set session_expiry_interval for CONNECT request.

Failure Information (for bugs)

Context

Failure Logs

Please include any relevant log snippets or files here.

While running example async_manual_acks_v5, the broker does not restore the session and re-send publishes because session_expiry_interval value 0xFFFFFFFF is not offered on connection:

Event = Incoming(ConnAck(ConnAck { session_present: true, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(20), max_qos: None, retain_available: None, max_packet_size: None, assigned_client_identifier: None, topic_alias_max: Some(10), reason_string: None, user_properties: [], wildcard_subscription_available: None, subscription_identifiers_available: None, shared_subscription_available: None, server_keep_alive: None, response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
 DEBUG rumqttc::v5::state > Subscribe. Topics = [Filter { path: "hello/world", qos: AtLeastOnce, nolocal: false, preserve_retain: false, retain_forward_rule: OnEverySubscribe }], Pkid = 1
Event = Outgoing(Subscribe(1))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 2, Payload Size = 1
Event = Outgoing(Publish(2))
 DEBUG rumqttc::v5::state > SubAck Pkid = 1, QoS = AtLeastOnce
Event = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtLeastOnce)], properties: None }))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 21, payload: b"\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 2, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 3, Payload Size = 2
Event = Outgoing(Publish(3))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 22, payload: b"\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 3, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 4, Payload Size = 3
Event = Outgoing(Publish(4))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 23, payload: b"\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 4, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 5, Payload Size = 4
Event = Outgoing(Publish(5))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 24, payload: b"\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 5, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 6, Payload Size = 5
Event = Outgoing(Publish(6))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 25, payload: b"\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 6, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 996.130476ms, last outgoing request before 996.385573ms
Event = Outgoing(PingReq)
Event = Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 7, Payload Size = 6
Event = Outgoing(Publish(7))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 26, payload: b"\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 7, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 8, Payload Size = 7
Event = Outgoing(Publish(8))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 27, payload: b"\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 8, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 9, Payload Size = 8
Event = Outgoing(Publish(9))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 28, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 9, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 10, Payload Size = 9
Event = Outgoing(Publish(10))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 29, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 10, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 11, Payload Size = 10
Event = Outgoing(Publish(11))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 30, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 11, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 987.076374ms, last outgoing request before 987.813679ms
Event = Outgoing(PingReq)
Event = Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Disconnect with NormalDisconnection
Event = Outgoing(Disconnect)
Error = MqttState(ConnectionAborted)
Incoming(ConnAck(ConnAck { session_present: false, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(20), max_qos: None, retain_available: None, max_packet_size: None, assigned_client_identifier: None, topic_alias_max: Some(10), reason_string: None, user_properties: [], wildcard_subscription_available: None, subscription_identifiers_available: None, shared_subscription_available: None, server_keep_alive: None, response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.002052179s, last outgoing request before 5.020963659s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.005117266s, last outgoing request before 5.005405529s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.000096725s, last outgoing request before 5.00081033s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.001096962s, last outgoing request before 5.001506118s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
xiaocq2001 commented 2 months ago

I created a PR to fix the issue, please check https://github.com/bytebeamio/rumqtt/pull/854

Now broker resending the publishes:

 DEBUG rumqttc::v5::state > Disconnect with NormalDisconnection
Event = Outgoing(Disconnect)
Error = MqttState(ConnectionAborted)
Incoming(ConnAck(ConnAck { session_present: true, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(20), max_qos: None, retain_available: None, max_packet_size: None, assigned_client_identifier: None, topic_alias_max: Some(10), reason_string: None, user_properties: [], wildcard_subscription_available: None, subscription_identifiers_available: None, shared_subscription_available: None, server_keep_alive: None, response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 1, payload: b"\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 2, payload: b"\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 3, payload: b"\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 4, payload: b"\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 5, payload: b"\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 6, payload: b"\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 7, payload: b"\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 8, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 9, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 10, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Outgoing(PubAck(1))
Outgoing(PubAck(2))
Outgoing(PubAck(3))
Outgoing(PubAck(4))
Outgoing(PubAck(5))
Outgoing(PubAck(6))
Outgoing(PubAck(7))
Outgoing(PubAck(8))
Outgoing(PubAck(9))
Outgoing(PubAck(10))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.001324026s, last outgoing request before 5.001175946s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.000821551s, last outgoing request before 5.001400427s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
swanandx commented 2 months ago

you can use set_connect_properties to set all kind of v5 connection properties, eg.

let mut properties = ConnectProperties::new();
properties.session_expiry_interval = Some(u32::MAX);
mqttoptions.set_connect_properties(properties);
xiaocq2001 commented 1 month ago

you can use set_connect_properties to set all kind of v5 connection properties, eg.

let mut properties = ConnectProperties::new();
properties.session_expiry_interval = Some(u32::MAX);
mqttoptions.set_connect_properties(properties);

Ah currently ConnectProperties is private, that's why I think we need an additional API, like we did for receive_maximum, max_packet_size and other properties.