Open swanandx opened 6 months ago
We could make use of something like a RequestBuilder
which will allow us to code as follows:
let publish = RequestBuilder::publish()
.topic("topic")
.qos(QoS::AtleastOnce)
.payload(b"1231231312")
.retain()
.properties(_) // v5 only
.build();
client.send(publish).await;
Why? Because topic, qos, etc are part of the publish/subscription request and not the message/filter.
We could make use of something like a RequestBuilder
I'm assuming it returns
Request::Publish(Publish)
( or subscribe ):
- It would be hard to modify the request after
build()
and thus compromise the re-usability ( usually users would just change the payload )- we need to check type of request in
send()
to properly handle the pkid logic ( or have differentpublish()
andsubscribe()
)- how would you specify multiple subscriptions in single packet? ( Each subscription includes a topic and a QoS level. )
Because topic, qos, etc are part of the publish/subscription request and not the message/filter.
reasoning was:
- for subscriptions we call the topic as filters and use
Filter
internally ( as well as externally for subscribe_many ). So it wan't much big change- for publication, I wanted to use
Topic
but as every message can be published with different qos, I just named that structMessage
It's not a good idea to go forward with using Filter
and Message
as they aren't the requests themselves and QoS, retain etc are request params. Something like a Publish/SubscriptionBuilder
seem to be the right way to go.
Technically , QoS and retain etc. aren't Request params, we use Request as a way to communicate with Eventloop, it's not in standards right?
As per MQTT Standards, term Topic Filters is used in subscriptions.
The Payload of a SUBSCRIBE packet contains a list of Topic Filters indicating the Topics to which the Client wants to subscribe
And Application Message for Publish,
A PUBLISH packet is sent from a Client to a Server or from a Server to a Client to transport an Application Message.
When you mention request, we are referring to Request
used in rumqttc right? Or did I misunderstood something 😅
Something like a Publish/SubscriptionBuilder seem to be the right way to go.
Agree, but what will be the return type from that builder, which we pass down to client.publish(_)
or client.subscribe(_)
? If we can figure that out, it would be really great!
Returning the packet itself might be confusing for users as it will have placeholder pkid, actually pkid will be allocated only after/during call to publish()
!
what about something like this ( based on above suggestions @de-sh ):
let sub: SubscribeRequest = RequestBuilder::subscribe("topic", QoS::AtLeastOnce)
// v5 only
.preserve_retain()
.nolocal()
.retain_forward_rule(RetainForwardRule::Never)
.build();
// build multiple SubscribeRequest if you want to subscribe_many
client.subscribe(sub)?;
let pub: PublishRequest = RequestBuilder::publish("topic", QoS::AtLeastOnce)
.payload("")
.retain()
.build();
client.publish(pub)?;
for i in 1..=10 {
// no need to clone, you can construct new request here!
let mut p = pub.clone();
p.payload(vec![1; i]);
client.publish(p)?;
}
btw, can there be better type than ..Request
?
^ that is one of the reason I find Filter
and Message
more sensible names ( also explained in above comment)! preferably, I would like to stick with them:
// can use just Filter::new(topic, qos) if don't need to set v5 options!
let sub: Filter = Filter::builder("topic", QoS::AtLeastOnce) // FilterBuilder
// v5 only
.preserve_retain()
.nolocal()
.retain_forward_rule(RetainForwardRule::Never)
.build();
client.subscribe(sub)?;
let pub: Message = Message::builder("topic", QoS::AtLeastOnce) // MessageBuilder
.payload("")
.retain()
.build();
client.publish(pub)?;
for i in 1..=10 {
// no need to clone, you can construct new request here!
let mut p = pub.clone();
p.payload(vec![1; i]);
client.publish(p)?;
}
Why not the following:
let sub: Subscribe = Subscribe::builder("topic", QoS::AtLeastOnce) // SubscriptionBuilder
// v5 only
.preserve_retain()
.nolocal()
.retain_forward_rule(RetainForwardRule::Never)
.build();
client.subscribe(sub)?;
Why not the following:
let sub: Subscribe = Subscribe::builder("topic", QoS::AtLeastOnce) // SubscriptionBuilder // v5 only .preserve_retain() .nolocal() .retain_forward_rule(RetainForwardRule::Never) .build(); client.subscribe(sub)?;
because:
Subscribe
packet can have multiple filter & subscription options pairs, which we can't specify in this format. ( Subscription options include QoS, preserve_retain, etc. When we do subscribe_many(filters)
, a single Subscribe
packet if created with those filters.Subscribe
packet, users might refer to it's pkid, thinking it is final ( though we can avoid this part with docs or something, but still a footgun )So if I am not wrong, a Subscribe
packet can contain multiple Subscription
s for the same request? In that caseSubscription
could just be used to replace Filter
term. Plus we really don't need two methods against client, we could just impl From<Subscription> for Vec<Subscription>
and pass both singleton and list to the same method as illustrated in this code sample.
In that case
Subscription
could just be used to replaceFilter
term
in protocol level as well? We might wanna be more inline with the terms as per standards right?
note: after implementing, and trying out from users pov, I think builders are overkill and are introducing unnecessary extra types. what can we do instead:
// Message { .. } constructor can be used if u want all fields
let mut message = Message::new("topic", QoS::AtMostOnce);
// set the fields if required
message.payload = "payload";
client.publish(message);
// same with Filter!
let mut filter = Filter::new("filter", QoS::AtMostOnce);
filter.preserve_retain = true;
client.subscribe(filter)
client can subscribe to a filter and client can publish a message
Filters were already exposed to the users, and were used by them, so it won't be a change. Only new type is Message
!
to summarize changes:
subscribe()
takes Filter
instead of string & qos ( similar to how it is with subscribe_many()
currently )publish()
takes Message
instead of all argumentsI get your argument, yet the protocol description distinguishes between topic filters and subscription.
The subscription is a transfer contract between the client and programmer in how it will forward the filters to the broker, the filter on the other hand is a contract between the broker and the client that broker will forward all messages(with mentioned QoS, etc.) to the client on said topic filter. Similarly publish is a contract between client and programmer that the message will be delivered to the broker with said QoS while the message itself is the payload contained within and not the whole package it gets transported in.
ah gotcha!
Subscription == Filter + Options! Will try to replace Filter
with Subscription
for users and see how it goes ( will be still calling it Filter
internally! )
ps: wdyt about SubscribeFilter
( ref )
what about the Message
though? what should we name it then? can't name it Publish
as it will conflict with publish packet, nor Publication
cuz sounds weird haha
as you mentioned here:
The subscription is a transfer contract between the client and programmer in how it will forward the filters to the broker
and as in standards:
The SUBSCRIBE packet is sent from the Client to the Server to create one or more Subscriptions
technically, Subscription
isn't sent, rather it is created upon receiving SUBSCRIBE
packet. So passing down Subscription
to subscribe()
doesn't sound right:
let subscription = Subscription::new("topic/filter", qos);
client.subscribe(subscription);
Paho also uses the same terminologies, they send a Message
to publish! ref: https://github.com/eclipse/paho.mqtt.rust/blob/master/examples/async_publish.rs#L59-L60
what about the
Message
though? what should we name it then? can't name itPublish
as it will conflict with publish packet, norPublication
cuz sounds weird hahaPaho also uses the same terminologies, they send a
Message
to publish! ref: https://github.com/eclipse/paho.mqtt.rust/blob/master/examples/async_publish.rs#L59-L60
We can continue to call it Publish and give it a publish builder, looks like eclipse/paho is using the naive understanding of the terms but naming is always the toughest part of writing software, so let's tread carefully on this one, will ensure the codebase is readable.
The SUBSCRIBE packet is sent from the Client to the Server to create one or more Subscriptions
technically,
Subscription
isn't sent, rather it is created upon receivingSUBSCRIBE
packet. So passing downSubscription
tosubscribe()
doesn't sound right:
Yes, that's what I meant by subscription being a contract between client and broker to forward message, the subscription is created on the basis of a request that contains the filter along with a set of other params, i.e. QoS, etc. So going by the wording, a subscription request is the combined product of a set [filter, QoS and other parameters], the subscription requests are packaged together along with a set of properties(not necessary) to create the MQTT subscription packet that is transfered over the network.
I have made the changes ( I already had them, so just pushed haha ) as per the originally proposed naming on rumqttc-pub-sub-api-revamp
branch, have a look: https://github.com/bytebeamio/rumqtt/compare/main...rumqttc-pub-sub-api-revamp
naming is always the toughest part of writing software, so let's tread carefully on this one, will ensure the codebase is readable.
Definitely agree on trading this carefully on this so our codebase is readable and our APIs are user friendly!
TBH, to me Filter/Message feels comfortable to use / maintain while PublishBuilder / Subscription introduces much more additional overhead both to users and readability, but that might just be due to some biases :sweat_smile:
let's also have some more people share their opinions / views on this? wdyt?
Hello,
Thanks for your work @swanandx , I'll give some though on the changes.
I like the new Message
struct to construct an MQTT message.
I'm doing the same on our wrapper around rumqtt
and currently I need to unfold my struct in order to call client.publish(...)
, thus having a rumqtt::Message
will allow us to do:
pub async fn publish(&self, msg: Message) -> Result<(), Error> {
// Current code
self.inner.publih(msg.topic(), msg.qos().into(), false, msg.payload()).await?
// New code
self.inner.publish(msg.into()).await? // We just need to add a `From<Message> for rumqtt::Message`
}
I also like the idea to have a builder
for Message
.
// Not sure if the `build()` step can fail or not ? Maybe for topic validation as you said.
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().build()?;
The state machine for the Builder
could look like that. It's very simple and allows you to build
at every step.
The only mandatory fields are topic
and qos
.
The payload
default to being empty, and the retain
to false
.
stateDiagram-v2
[*] --> New
New --> Payload
New --> Build
New --> Retain
Payload --> Build
Payload --> Retain
Retain --> Build
Build --> [*]
Thus, all the possibilities are:
let message = Message::new("topic", QoS::AtMostOnce).build()?;
let message = Message::new("topic", QoS::AtMostOnce).retain().build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().build()?;
For v5
if I understand correctly, we could add a property call that you can repeat.
stateDiagram-v2
[*] --> New
New --> Payload
New --> Build
New --> Retain
New --> Property
Payload --> Build
Payload --> Retain
Payload --> Property
Retain --> Build
Retain --> Property
Property --> Property
Property --> Build
Build --> [*]
Thus, it will leads to:
// Previous cases +
let message = Message::new("topic", QoS::AtMostOnce).property(...).build()?;
let message = Message::new("topic", QoS::AtMostOnce).retain().property(...).build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").property().build()?;
let message = Message::new("topic", QoS::AtMostOnce).payload("").retain().property().build()?;
// And you can chain properties
let message = Message::new("topic", QoS::AtMostOnce)
.payload("")
.retain()
.property(...)
.property(...)
.build()?;
For the Subribe
part, I'm not sure of the terminology either.
Filter
can be seen as a SubscribeFilter
or a TopicFilter
.
The Builder
pattern suits the Filter
perfectly because you only need to build it once.
I think it's better to use Message::new
instead of Message::builder
. The user don't want to create a Builder
, it wants a New
Message. The builder pattern is good because it fits well with the natural way of thinking about constructing new things in Rust.
I've an idea, it might be weird, but I think it's worth a proposition.
What if the Message
was only the immutable data (i.e: topic
, qos
, retain
, property
) and we can construct each time a new Message
with the wanted payload.
Let me show you what I mean.
We want to first create an immutable Message
, and send it when we receive a payload
.
Message
and the Payload
as parameters./// Send data to mqtt when the `rec` receive data from another tasks.
async fn send_task(client: AsyncClient, topic: Topic, qos: QoS, rec: Receiver<[u8]>) -> Result<(), Error> {
let msg = Message::new(topic).qos(qos).retain().build();
while let Some(data) = rec.recv().await {
client.publish(&msg, data).await?;
// or
client.publish(msg.clone(), data).await?;
}
}
PublishMessage
with the Message
and Payload
stored by ref inside it/// Send data to mqtt when the `rec` receive data from another tasks.
async fn send_task(client: AsyncClient, topic: Topic, qos: QoS, rec: Receiver<[u8]>) -> Result<(), Error> {
let msg = Message::new(topic).qos(qos).retain().build();
while let Some(data) = rec.recv().await {
client.publish(msg.payload(data)).await?;
}
}
impl Message {
fn payload(&'a self, payload: &'b [u8]) -> PublishMessage<'a, 'b> {
PublishMessage::new(self, payload)
}
}
It's kind of a similar way of how paho
does it with its Topic.
Can you make a draft PR of the branch rumqttc-pub-sub-api-revamp
in order to better see the changes ?
I'll gladly review the code even if it's a WIP.
Thank you so much for the feedback @benjamin-nw ! :rocket: :100:
here are my thoughts on it:
I also like the idea to have a builder for Message.
Instead of a builder, where you would need to call build()
at end, I was thinking more of adding a consuming pattern like
let message = Message::new(..).payload("...");
// here payload fn will look something like
impl Message {
fn payload(mut self, payload: _) -> Self {
self.payload = payload;
self
}
}
reason:
build()
to get Message
note:
new()
to return Result<Self, Err>
if we want to verify the topic right away.message.topic = "_"
as we won't be able to verify it. We would need fn like get_topic()
& set_topic()
to do sowe could add a property call that you can repeat.
if
property(_)
was there, it will takePublishProperties
struct, so we won't need to repeat the call.
currently, instead of putting properties in message, we pass them to different functions:
client.publish(message);
client.publish_with_properties(message, properties); // v5 only
do you think adding them to Message
is better? if yes, will do it!
Initially I didn't add them due to thinking like we are publishing the message with properties, the message itself won't have the properties right? and we needed to pass PublishProperties
to Message
which felt bit awkward haha. But I guess if it is more user friendly to add properties in Message
itself, then sure we can do it!
Sending the Message and the Payload as parameters.
we can work around it without need for sending payload as parameter, like shown in examples:
async fn requests(client: AsyncClient) {
let filter = Filter::new("hello/world", QoS::AtMostOnce);
client.subscribe(filter).await.unwrap();
let mut message = Message::new("hello/world", QoS::ExactlyOnce);
for i in 1..=10 {
message.payload = vec![1; i];
client.publish(message.clone()).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}
It's kind of a similar way of how paho does it with its Topic.
Topic seems like just an wrapper around client, so more like an additional abstraction, which we can consider to be out of scope for this RFC right?
Can you make a draft PR of the branch rumqttc-pub-sub-api-revamp in order to better see the changes ?
I will do it once we have finalized the basic design from the RFC :100: for now, have a look here: https://github.com/bytebeamio/rumqtt/compare/main...rumqttc-pub-sub-api-revamp
CI running on PRs whenever I push a commit kinda annoys me haha ( jk :stuck_out_tongue_winking_eye: )
Thanks for the review! :)
Hi, happy new year 🎉 !
I was thinking more of adding a consuming pattern like
Yeah I think it's a good trade-off between having a new type for a Builder and having a good user experience for handling Message
s 👍🏻
we need to change new() to return Result<Self, Err> if we want to verify the topic right away.
Agreed, or we can we can always validate topic in call to publish
, it's also a solution to keep Message
simple.
if we do so, we can't update the topic like message.topic = "_" as we won't be able to verify it. We would need fn like get_topic() & set_topic() to do so
I think it's a bad idea to have set_topic()
. We can enforce the creation of a new Message
if we want a new Topic
. Thus, the only way to access a new topic, is to create a new Message
, which is, very simple.
We'll only need topic()
and payload(...)
.
do you think adding them to Message is better?
For this one, I'm not sure, we don't use properties. But keeping another function like publish_with_properties
might be a bit clearer to really indicate that you are using v5
.
Initially I didn't add them due to thinking like we are publishing the message with properties, the message itself won't have the properties right?
Yes, we can keep them separated. It make more senses to have the properties outside the Message
. Thus we can change properties without affecting the Message
.
we can work around it without need for sending payload as parameter
For this, I'm not sure.
Is it expensive to clone()
the Message
? Because we'll be clon
ing a lot our current Message
.
If we are using Bytes
for the payload it's okay because we only clone
an Arc
, but the rest of the Message
, it might cost more if the topic
is big.
We need to check which one is cheaper in term of memory usage, and which one is simpler to use between:
async fn requests(client: AsyncClient) {
let filter = Filter::new("hello/world", QoS::AtMostOnce);
client.subscribe(filter).await.unwrap();
let mut message = Message::new("hello/world", QoS::ExactlyOnce);
for i in 1..=10 {
message.payload = vec![1; i];
client.publish(message.clone()).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}
or
async fn requests(client: AsyncClient) {
let filter = Filter::new("hello/world", QoS::AtMostOnce);
client.subscribe(filter).await.unwrap();
let mut message = Message::new("hello/world", QoS::ExactlyOnce);
for i in 1..=10 {
let payload = vec![1; i];
client.publish(&message, payload).await.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}
Topic seems like just an wrapper around client, so more like an additional abstraction, which we can consider to be out of scope for this RFC right?
I agree and I don't like the way paho implements this wrapper either. So this is out of scope 👍🏻
I will do it once we have finalized the basic design from the RFC 💯 for now
Got it 👍🏻 will look at the branch for now.
Hey, Happy New Year! :partying_face: :fireworks:
Thanks for the detailed reply!
it might cost more if the topic is big.
If we pass the topic as reference to avoid cloning, it will be actually of type
Into<String>
( similar to our existing design ), and we will end up calling.into()
on it to convert it toString
anyways ( e.g. here ) . So I think cost will be similar in both the cases.
Payload being the part of Message
seems more intuitive and might as well help in debugging? ( not sure here haha )
note: passing payload as parameter, would allow fns like publish_bytes
where we can use Bytes
directly!
We need to check which one is cheaper in term of memory usage, and which one is simpler to use between:
Even though I was prioritizing simplicity here ( and to tackle performance later ), I definitely agree with you, we should verify the perf impact! I'm positive that it will be negligible / none for most of the users :)
Payload being the part of Message seems more intuitive
Yup, we can do this then.
If we pass the topic as reference to avoid cloning, it will be actually of type Into
Oh, didn't know that, good for me 👍🏻
and might as well help in debugging?
Technically yes because the payload
will be in Message
and not elsewhere.
note: passing payload as parameter, would allow fns like publish_bytes where we can use Bytes directly!
You mean as a public API ?
Even though I was prioritizing simplicity here ( and to tackle performance later )
Full 👍🏻 on this. Let's make a simple yet useful API and tackle performance issues (if any) later.
You mean as a public API ?
Yup, it is currently in API https://github.com/bytebeamio/rumqtt/blob/main/rumqttc%2Fsrc%2Fclient.rs#L132
but I removed it in this RFC due to Message
. Maybe we can use generics & traits such that Message payload field can accept both? Will have to see how complex it gets though 👀
This is RFC for API changes in
publish
/subscribe
in rumqttc. Feel free to comment and provide feedback, thanks!Publish
Message
Message
for publishingnote: topic will be verified in call to
publish()
. Shall we perform the topic verification logic while constructingMessage
?Subscribe
Filter
to subscribenote: where shall we verify the filter?
Notes
publish
andsubscribe
would returnpkid
( but up-to possibility of implementation )try_publish
,try_subscribe
,subscribe_many
etc. will be updated accordingly