Closed kleunen closed 6 years ago
NOTE for others This issue is originally from the following comments: https://github.com/redboltz/mqtt_cpp/commit/7b197fbfb11d239325a84e225d0eaa599b5a0a88#commitcomment-28289184
This is the response for the commit comments.
I guess that you already use the scatter-gather I/O in this commit ?
Yes.
Also I implemented the function that makes continuous buffer from chunks. https://github.com/redboltz/mqtt_cpp/commit/7b197fbfb11d239325a84e225d0eaa599b5a0a88#diff-a4e7087e8fcd9fd2d33e73f7ba28a4bfR118
But it is for keeping backward compatibility for serialize (persistence) handler. create_continuous_buffer()
is NOT used for asio communication.
I think that the commit shouldn't introduce big heap usage. But introduces some more heap allocation and deallocation.
For example,
https://github.com/redboltz/mqtt_cpp/commit/7b197fbfb11d239325a84e225d0eaa599b5a0a88#diff-a4e7087e8fcd9fd2d33e73f7ba28a4bfR97 creating vector for the scatter-gather I/O. It introduces memory allocation for the pairs of pointer and size.
I introduce shared_ptr for the payload. If the user passes the payload as std::string const&
, it is copied to std::shared_ptr<std::string>
. See
https://github.com/redboltz/mqtt_cpp/commit/7b197fbfb11d239325a84e225d0eaa599b5a0a88#diff-d2b35043a1ae645c0e0dc965aaebbbe7R4380
If user passes the payload as std::shared_ptr<std::string> const&
, it is NOT copied. This is typical use-case to implement the broker. So I introduce the update.
ok, thank you very much for your reply. I would say this definitely should reduce heap fragmentation because less and smaller heap allocations are performed. I will keep an eye on how the memory performance is using the new buffer handling.
I can modify my code to also pass a std::shared_ptr
@kleunen , I will ready to improve mqtt_cpp. Your report is very helpful. If you find something, please let me know.
By the way, we have two overloads of publish APIs. The difference is the type of contents (payload).
One is std::string const&
, the other is std::shared_ptr<std::string> const&
. The former is original one. The latter is newly introduced one. When I implemented the latter, I came up with the third one. That is std::string
. It is for move.
So I can implement as follows:
void publish(
...
std::string contents,
...
) {
publish(
....
std::make_shared<std::string>(std::move(contents)),
....
I think that it is not so useful because the user often needs keeping the payload after publish. So move is not appropriate. In addition, publish API has many overloads. If I introduce the third type, std::string
, overloads increase very much. It's my headache ;)
Finally, I decided I don't introduce std::string
overload.
Yes, I think having two/many buffer approaches (using shared_ptr
I would suggest using this approach as preferred / internal approach, and providing wrapper functions to allow std::string and std::shared_ptr
The approach that asio takes (using the pair), is not the most convenient approach. The user of the api should itself make sure the deallocation is done when the operation is completed. If it fails to do correctly, the application will fail horribly. But it is a very powerful approach, the user himself can decide how the allocation / deallocation tracking should be done, for example using the shared_ptr, or some other approach (keeping the string in a vector or whatever). You now make the decision for the user of the api. It is not a bad approach per se, but the boost library just sets a very high standard on the design of APIs, that is why they choose that approach I guess.
Maybe good to compare your approach also to the approach the beast library is using. I never used the beast library.
I think that the big difference between asio and mqtt_cpp is automatic resend functionality. asio is very simple. Each async_write is complete, so the user need to keep the lifetime of ptr until the write completion handler will call. On the contrary, mqtt_cpp needs to keep the message that contains topic-name, payload, and so on after async_publish is finished if QoS is 1 or 2. The message will be erased if puback or pubrec message is received. If the connection is disconnected before puback or pubrec is received, and user connects again with the CleanSession flag is false, then the message should resend. It is complicated part of MQTT specification. I omitted the detail about pubrel, pubcomp and message persistence.
I think that it's difficult to manage the message's lifetime by user.
I think your scenario is not more difficult than the asio use case. Indeed, you should keep track of two buffers (topic and payload) and asio only tracks one buffer (payload). But this makes no difference I would say. Asio has a completion handler for every operation, you can have the same. So the publish function then becomes:
void publish(topic, payload, qos, retain, std::function<void()> completion_handler)
the user can now do this:
void PublishComplete(std::shared_ptr<std::string> const &topic, std::shared_ptr<std::string> const &payload)
{
}
std::shared_ptr<std::string> topic(new std::string("Hello"));
std::shared_ptr<std::string> payload(new std::string("world"));
mqtt.publish(as::buffer(topic->data(), topic->size()), as::buffer(payload->data(), payload->size()),
qos::at_most_once, boost::bind(PublishComplete, topic, payload));
Internally, your library keeps a publish request:
struct PublishRequest
{
as::buffer topic, as::buffer payload;
std::function<void()> completion_handler;
}
std::deque<PublishRequest> outstanding_requests;
It will add the request to the 'outstanding_requests' on a publish call, and call the completion handler and remove the request when the request is complete:
void RequestComplete(size_t index) {
outstanding_requests[index].completion_handler();
outstanding_requests.erase(outstanding_requests.begin() + index);
}
When the completion is called depends on the QoS of the message:
QoS 0: called when asio write has complete QoS 1: called when puback is received QoS 2: called when pubcomp is received
The library now trackes the lifetime of the message, not the user. Hopefully this is clear
I don't understand what you mean yet.
Could you use triple continuous back quotes? It can avoid github formatting.
As far as I understand, the type of payload is const_buffer
. Is that right?
void publish(topic, payload, qos, retain, std::function<void()> completion_handler)
And it seems that the type of payload is std::shared_ptr<std::string> const&
.
void PublishComplete(std::shared_ptrstd::string const &topic, std::shared_ptrstd::string const &payload)
It implies that the library create the std::shared_ptr<std::string>
internally. So it requires copy operation.
I think that the type of payload should be std::shared_ptr<std::string> const&
at least one overload
void publish(topic, payload, qos, retain, std::function<void()> completion_handler)
The publish API is not only for MQTT client but also MQTT server (broker). So the name is endpoint.
Consider this scenario.
publisher1 ----- broker ------ subscribers 1...10000
publisher1 publishes the message. The message is delivered to all subscribers.
The broker uses publish API to deliver message. If the publish API is not std::shared_ptr<std::string> const&
but ptr and size
, then 10000 times copies are required.
See the test broker. It demonstrates delivering many subscribers message without payload copy. https://github.com/redboltz/mqtt_cpp/blob/master/test/test_broker.hpp#L259
Thank you for updating the comment https://github.com/redboltz/mqtt_cpp/issues/124#issuecomment-376543939. Now I understand the concept of your approach. I think that it is nice :) I will consider in detail about the approach.
For the broker, consider the following example:
#include <iostream>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <deque>
#include <utility>
struct PublishMessage
{
std::string topic;
std::string payload;
size_t references;
PublishMessage(std::string const &_topic, std::string const &_payload)
: topic(_topic), payload(_payload), references(0)
{ }
~PublishMessage()
{
std::cout << "Message deleted" << std::endl;
}
};
typedef boost::intrusive_ptr<PublishMessage> PublishMessagePtr;
inline void intrusive_ptr_add_ref(PublishMessage* m){
++m->references;
}
inline void intrusive_ptr_release(PublishMessage* m){
if(--m->references == 0)
delete m;
}
void CompleteHandler(PublishMessagePtr const &message)
{
std::cout << "Complete" << std::endl;
}
struct OutstandingMessage
{
std::pair<char const *, size_t> topic;
std::pair<char const *, size_t> payload;
boost::function<void()> complete;
OutstandingMessage(std::pair<char const *, size_t> const &_topic, std::pair<char const *, size_t> const &_payload, boost::function<void()> const &_complete)
: topic(_topic), payload(_payload), complete(_complete)
{ }
};
std::deque<OutstandingMessage> outstanding_message;
void Publish(std::pair<char const *, size_t> const &topic, std::pair<char const *, size_t> const &payload, boost::function<void()> const &complete)
{
outstanding_message.push_back(OutstandingMessage(topic, payload, complete));
std::cout << "Publish: " << std::string(topic.first, topic.second) << " " << std::string(payload.first, payload.second) << std::endl;
}
void SimulatePublishFinish()
{
std::cout << "Publish message complete!!" << std::endl;
outstanding_message.front().complete();
outstanding_message.pop_front();
}
void DoPublish()
{
PublishMessagePtr message(new PublishMessage("hello", "world"));
Publish(
std::make_pair<char const *, size_t>(message->topic.data(), message->topic.size()),
std::make_pair<char const *, size_t>(message->payload.data(), message->payload.size()),
boost::bind(CompleteHandler, message)
);
}
int main() {
DoPublish();
SimulatePublishFinish();
return 0;
}
You can run the example here: Example
It seems in your code, the topic is still copied, but the payload is stored in shared_ptr, correct ?
For broker, I think that I don't need to introduce intrusive_ptr approach.
Here is the type definition the publish_handler.
using publish_handler = std::function<bool(std::uint8_t fixed_header,
boost::optional<std::uint16_t> packet_id,
std::string topic_name,
std::string contents)>;
publish_handler passes contents as std::string
by value. It's current implementation.
In broker code:
ep.set_publish_handler(
[]
(std::uint8_t fixed_header,
boost::optional<std::uint16_t> packet_id,
std::string topic_name,
std::string contents) {
auto sp = std::make_shared<std::string>(std::move(contents));
// for all subscribers
{ // loop
// new publish you mantioned
publish(
// ...
boost::asio::buffer(sp->data(), sp->size()),
// ...
[sp]{} // completion handler
);
}
}
It seems in your code, the topic is still copied, but the payload is stored in shared_ptr, correct ?
Yes. Before you post this issue, I didn't decide it. Because I don't want to many combinations of std::string const&
and std::shared_ptr<std::string> const&
for topic and contents.
But now, I can apply boost::asio::const_buffer
to topic and contents. Because it is the most efficient one. And existing APIs can be implemented based on the boost::asio::const_buffer
one.
ok, good. The initial result of using the shared_ptr payload implementation is that the memory consumption is significantly reduced on my server application. The memory consumption is about 35 MB now, where is was 80-90MB before. But I still consider this quite high, because all the server does is convert ssl json -> mqtt packets. It does not store anything. I think also passing the topic as reference and using boost::asio::const_buffer instead of shared_ptr would be a good improvement.
@kleunen ,
I'm implementing the publish function.
void publish(topic, payload, qos, retain, std::function<void()> completion_handler)
I'd like to ask your idea.
Topic and payload are fine. Their lifetime is managed by the user via completion_handler. How about other part of publish packet?
Publish packet contains the following parts of fields: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718037
No | field | note |
---|---|---|
1 | fixed header | managed by lib |
2 | remaining length | managed by lib |
3 | topic name length | managed by lib |
4 | topic name | managed by user |
5 | packet id | managed by lib (QoS1, 2 only) |
6 | payload | managed by user |
My question is now do I treat well with No 1, 2, 3, and 5.
My first idea is introducing two std::shared_ptr<std::string>
. One is for No 1-3, the other is for No 5.
Do you have better idea?
I mean something like this:
void publish(topic, payload, qos, retain, std::function<void()> completion_handler)
{
auto buf1to3 = std::make_shared<std::stirng>();
auto buf5 = std::make_shared<std::stirng>();
// ... build bufs
auto cb = [completion_handler, buf1to3, buf5] {
completion_handler();
};
// build const_buffer_sequence (argument is boost::asio::const_buffer)
// pseudo code
const_buffer_sequence.insert(buf1to3);
const_buffer_sequence.insert(topic_name);
const_buffer_sequence.insert(buf5);
const_buffer_sequence.insert(payload);
async_write(const_buffer_sequence, cb);
}
If you store the request in an outstanding in a struct, I would add fields for the header and extra length fields for the topic and payload:
struct OutstandingMessage
{
uint8_t fixed_header[2];
uint8_t remaining_length[2]
uint8_t topic_length[2];
uint8_t packet_id[2];
std::pair<char const *, size_t> topic;
std::pair<char const *, size_t> payload;
boost::function<void()> complete;
OutstandingMessage(std::pair<char const *, size_t> const &_topic, std::pair<char const *, size_t> const &_payload, boost::function<void()> const &_complete)
: topic(_topic), payload(_payload), complete(_complete)
{ }
};
If the fields are fixed width or have a maximum size.
In the outstanding message you can also include a fixed size storage for the 'const_buffer_sequence'. Because you will always send either 3 or 4 chunks, correct ? That way you need only one allocation
With the completion handler, I would recommend including an error code (std::function<void(mqtt::error_code)> completion_handler). And include a feedback whether the operation was succesfull (succesfull delivery of packet) or aborted (connection closed) or time-out or failed, etc ...
Thank you for the comments. It's very helpful. I will improve mqtt_cpp. I need a time to implement.
You are welcome. Let me know when you have some results. Your test broker is also very interesting, I would be interested to see how it performance compared to mosquitto and emqttd.
I sent the PR to fix the issue as #127. Now you can use as::const_buffer
, (ptr, size based type), for topic_name and contents. Could you test it?
Here is example (test) code: https://github.com/redboltz/mqtt_cpp/blob/fix_124_revised/test/as_buffer_async_pubsub.cpp#L105
Fixed by #127
i have been using your mqtt library for some time now. I do see a higher memory usage then before, when I was using ZeroMQ. I think this has to do with heap fragmentation that is occuring. I see here now, you have been improving the memory handling of the packets.
I have run valgrind on my application, but I do not see any memory leaks. So it seems the memory is not really leaking. But some heap fragmentation does seem to occur. At the moment I am not sure what triggers this situation.