Open flxo opened 8 months ago
Hey @flxo , so basically we want to buffer the outgoing packets and then flush / write them to network at once to avoid multiple syscall, correct?
pending
but the state has updated and it would just cause lack of consensus ]note: I haven't dived in depth in poc but i think easier solution might be:
next_request
to return 0..=MAX_BATCH_SIZE
requests. o = Self::next_requests(
..
), if .. => match o {
Ok(requests) => {
for req in requests {
self.state.handle_outgoing_packet(req)?;
}
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
Ok(self.state.events.pop_front().unwrap())
}
Err(_) => Err(ConnectionError::RequestsDone),
},
^ this may have different disadvantages which i haven't thought about it though haha.
btw, can you elaborate more details of the use case? as batching publishes would mean we are okie with messages being published late right?
Batching would be cool feature to have though :100:
Thank you so much!
Thanks for your thoughts!
Hey @flxo , so basically we want to buffer the outgoing packets and then flush / write them to network at once to avoid multiple syscall, correct?
Yes.
- How will it work out for keepalive messages? we have to send them as soon as the timeout is reached.
Currently the alives are flushed when the loop breaks of finishes. We could easily use send
instead of feed
for the pings to ensure they're sent immediately.
- What happens when a message is lost before being written to network? let's say my batch size was 5, I publish message 1, 2, 3. As batch size isn't reached, they aren't flushed, but for with message 4 ( or by some other reason ) client disconnects / crashes. Now, state/client thinks that it has published the messages, but in reality it hasn't. ( Or same might happen if broker disconnects right? ). Though this can me mitigated with use of Qos1/2 so we can republish those messages, the message being lost not good. [ we could have put them back in
pending
but the state has updated and it would just cause lack of consensus ]
Good catch. This is definitely missing in my hack. One option could be to retrieve the sending buffer from Framed
and reuse for the next connection. I've done that lately in a project where we had a similar issue.
note: I haven't dived in depth in poc but i think easier solution might be:
- modify
next_request
to return0..=MAX_BATCH_SIZE
requests.- handles those requests and then call to flush, kinda like:
o = Self::next_requests( .. ), if .. => match o { Ok(requests) => { for req in requests { self.state.handle_outgoing_packet(req)?; } match time::timeout(network_timeout, network.flush(&mut self.state.write)).await { Ok(inner) => inner?, Err(_)=> return Err(ConnectionError::FlushTimeout), }; Ok(self.state.events.pop_front().unwrap()) } Err(_) => Err(ConnectionError::RequestsDone), },
^ this may have different disadvantages which i haven't thought about it though haha.
You cannot await a timeout in the select branch. This would stall the processing of incoming frames as well.
A less intrusive option could be to return a Vec
of Requests
from next_requests
. This would use recv_async
and `try_recv) like this: (rough pseudo code)
async fn next_requests(..) -> Result<Vec<Request>, ...> {
let mut result = Vec::with_capacity(10);
let next = requests_rx.recv_async().awit?;
result.push(next);
while result.len() < result.capacity() {
match requests_rx.try_recv() {
Ok(r) => result.push(r),
Err(Empty) => return Ok(result),
Err(e) => return Err(e),
}
}
Ok(result)
}
btw, can you elaborate more details of the use case? as batching publishes would mean we are okie with messages being published late right?
Not really late. The tx buffer is filled as long as there are requests pending from the client (and up to a certain limit). There's not timeout awaiting in between. The flush happens right along.
The use case is that we a client that has a high output rate. We're currently starving on the publish side because there's also inflow traffic that is processed with (currently) 10 times more iterations per poll than the incoming packets.
Batching would be cool feature to have though 💯
Agree :-)
Thank you so much!
A less intrusive option could be to return a Vec of Requests from next_requests. This would use recv_async and `try_recv) like this:
That is exactly what I meant when I said modify next_request
to return 0..BATCH_SIZE
requests
I'm putting range here as sometimes we would have < BATCH_SIZE elements.
^ just like the pseudo code you mentioned in comment!
can you also please confirm the perf impact once ( likely there won't be much with above approach ) and also if it's actually helping in reducing syscalls.
thanks 😊
A less intrusive option could be to return a Vec of Requests from next_requests. This would use recv_async and `try_recv) like this:
That is exactly what I meant when I said modify
next_request
to return0..BATCH_SIZE
requests I'm putting range here as sometimes we would have < BATCH_SIZE elements.^ just like the pseudo code you mentioned in comment!
I refined the pseudocode here
can you also please confirm the perf impact once ( likely there won't be much with above approach ) and also if it's actually helping in reducing syscalls.
Can you specify what exactly you want? Profiling data e.g flame graphs?
The biggest advantage is to avoid starvation on the upstream path.
can you also please confirm the perf impact once ( likely there won't be much with above approach ) and also if it's actually helping in reducing syscalls.
A colleague tested the patch in one of our products. It's a arm64 linux on A57 with a constant publication rate of ~800/s to a local broker (test setup). We see a reduction in the CPU usage of the client application of ~8.5%.
Is there a reproducible benchmark somewhere around here?
We see a reduction in the CPU usage of the client application of ~8.5%.
niceee!
Is there a reproducible benchmark somewhere around here?
not that i'm aware of :sweat_smile:
i had a doubt regarding the patch, why are we doing read_bulk
for incoming packets? readb
function already reads packets in bulk, so i think we only need the handling of outgoing packets in batches right?
i had a doubt regarding the patch, why are we doing
read_bulk
for incoming packets?readb
function already reads packets in bulk, so i think we only need the handling of outgoing packets in batches right?
I changed the inflow just align it in style with outflow. The current implementation processes the messages in readb
which is super confusing (and needs to be cancel safe ;-)). Oh - I renamed read
to read_bulk
.
and needs to be cancel safe
right, was
readb
not cancel safe? can you elaborate more?
I would say readb
approach was better ( we should rename it to read_bulk
though :stuck_out_tongue_closed_eyes: ), as we have to process the message we received right away, plus we avoid unnecessary allocations wdyt?
thanks!
and needs to be cancel safe
right, was
readb
not cancel safe? can you elaborate more?
Sorry this was imprecise. readb
is cancel safe but as the full rx path is polled in a select branch it has to be. This mistakes happen fast (at least to me).
I would say
readb
approach was better ( we should rename it toread_bulk
though 😝 ), as we have to process the message we received right away, plus we avoid unnecessary allocations wdyt?
You're right regarding the allocations.
Shall I draft a PR with the update to the rx path only?
Just out of curiosity: There's the other branch that uses tokio_util::codec::Framed. It's missing the feature to retain the tx buffer over reconnects but otherwise is complete and passes the tests. The drawback here is that it introduces some async
in State
and the dependency to tokio_util
). It should be optimal from the allocation point of view because the packets are serialised directly into the buffer of Framed
(yes - that's also the case today but not homebrewn.. Also tx and rx is processed interleaved. The framed impl is way easier to read than today. What's your opinion on that one?
the full rx path is polled in a select branch it has to be
still kinda went over me :sweat_smile: can you please explain wdym by full rx path and what mistakes ?
Shall I draft a PR with the update to the rx path only?
let's keep this issue related to batching of Outgoing
packets only! also can you please open PR for just that part?
we will have different PR/issues/conversation for anything related to handling of incoming packets, this will make it easier for review and reason about, wdyt?
It should be optimal from the allocation point of view because the packets are serialised directly into the buffer of Framed
iirc, we are already re-using the buffer to read packets. So i think it will be same as you mentioned ( also due to how mqttbytes parse packets ).
this might be biased ( and due to branch containing other refactors ) but for readability, I find the current code more maintainable / readable than with Framed
. and if it's not bringing any additional benefits, i think sticking to existing structure makes more sense right :smile:
thanks!
Thanks for your answer.
the full rx path is polled in a select branch it has to be
still kinda went over me 😅 can you please explain wdym by full rx path and what mistakes ?
When modifying the read related code in Network
you always have to keep in mind that it must be cancel safe because it's all driven from the select!
in event_loop
. This is not obvious from the module itself. Anyways - the current impl is correct.
Shall I draft a PR with the update to the rx path only?
let's keep this issue related to batching of
Outgoing
packets only! also can you please open PR for just that part?
Sorry - my mistake - should have been tx.
we will have different PR/issues/conversation for anything related to handling of incoming packets, this will make it easier for review and reason about, wdyt?
Fully agree.
It should be optimal from the allocation point of view because the packets are serialised directly into the buffer of Framed
iirc, we are already re-using the buffer to read packets. So i think it will be same as you mentioned ( also due to how mqttbytes parse packets ).
this might be biased ( and due to branch containing other refactors ) but for readability, I find the current code more maintainable / readable than with
Framed
. and if it's not bringing any additional benefits, i think sticking to existing structure makes more sense right 😄
Fine for me.
Expected Behavior
Outgoing publications are sent to the network io in batches and not in inefficient writes per publish.
Current Behavior
The (v5) client code "flushes" the outgoing queue after each request from the application here. This leads to one
write
orsendto
systemcall per publication. This is inefficient.Proposal
Queue up to n requests from the request channel and flush similar to the processing of the incoming packages where up to 10 packets are processed in a batch.
I implemented a small poc that loops over requests and processes up to n incoming or outgoing packages before flushing the io. We're currently evaluating the performance impact in our scenario which is has publications from the client to the broker in high frequencies (a less common use case).
Let me know if this is a possible way to tackle the problem.