redboltz / mqtt_cpp

Boost Software License 1.0
432 stars 107 forks source link

Working toward a fully zero copy receive path #320

Closed jonesmz closed 5 years ago

jonesmz commented 5 years ago

This is related to https://github.com/redboltz/mqtt_cpp/issues/248, https://github.com/redboltz/mqtt_cpp/issues/191, and https://github.com/redboltz/mqtt_cpp/issues/289

I was working more with the v5 properties, and realized that when we receive the properties from the network we allocate a new buffer for the std::vector<> to store the mqtt::v5::property_varients, even if the property_varient only holds ref properties.

I'd like to see mqtt_cpp support zero allocations for handling the data from incoming packets.

We can do that with the following changes:

1) Store a pool of shared_ptr<char[]>, which each message is written into. 2) All callbacks that endpoint.hpp calls when passing messages up to the user level code pass "ref" objects, such as mqtt::string_view, mqtt::will_ref, mqtt::property_ref 3) The callbacks also pass an mqtt::any, which holds the shared_ptr<char[]> for the message that all of the ref objects refer to.

A more complicated, but "better" way to handle this is to:

1) Store a pool of shared_ptr<char[]>, which each divisible part of a message is written into. 2) This means that each message contents, username, password, v5::property, and so on are written into their own buffer. We'll need to have each buffer be contiguous so that higher level classes like string_view and stuff can refer to them directly, but we should be able to keep memory fragmentation down by setting a minimum buffer size of e.g. 256, and always retrieve buffers as multiples of twos. 3) All callbacks that endpoint.hpp calls when passing messages up to the user level code pass "ref" objects, such as mqtt::string_view, mqtt::will_ref, mqtt::properties. 4) Create some kind of mqtt::owning_string_view that implements the API from mqtt::string_view, but also holds a handle to the std::shared_ptr<char[]> 5) Modify mqtt::will and mqtt::property to always hold a reference to the std::shared_ptr<char[]>, and when they are created by user code, serialize directly into a newly allocated std::shared_ptr<char[]>. 6) Each of the callbacks that endpoint.hpp calls don't need to be modified because each of the arguments already holds a std::shared_ptr<char[]>.

Further, for the above handling of properties we have two ways to avoid allocating that std::vector.

We can either have std::vector<> pull it's storage from the same memory pool of char[]'s, or we can create a new custom data type "property_cursor" that has a pointer to the entire message, and iterates over the message on an as-needed basis to construct the property objects on the fly.

If we implement this by having each chunk of the message given to it's own buffer (more complicated, but would be better over all in my opinion), then we should have the std::vector<> use the memory pool as an allocator.

If we have each message stored in a single buffer, we should implement this using the "property_cursor" concept.

@redboltz I'd like to hear your thoughts on the matter.

redboltz commented 5 years ago

Could you give me some example?

I think https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100 publish is better for the example.

redboltz commented 5 years ago

I expect as follows:

  1. allocate std::shared_ptr<char[1]> for fixed_header. (Or different way).
  2. call async_read() for 1 byte.
  3. allocate std::shared_ptr<char[4]> for remaining_length.
  4. call async_read() for 1 byte. (or 4 bytes?)
  5. ..... then read topic properties payload .. I want to know how many std::shared_ptr<char[]>s are allocated. And the relationship of the async_read bytes and allocated size.
jonesmz commented 5 years ago

In your example:

I expect as follows:

  1. allocate std::shared_ptr<char[1]> for fixed_header. (Or different way).

Not needed, since we only need that data on the stack.

  1. call async_read() for 1 byte.

Right, this is how it's currently done.

allocate std::shared_ptr<char[4]> for remaining_length.

Also unnecessary, since the length is only used on the stack, like we currently do things.

call async_read() for 1 byte. (or 4 bytes?)

Right, this is how it's currently done.

..... then read topic properties payload .. I want to know how many std::shared_ptr<char[]>s are allocated. And the relationship of the async_read bytes and allocated size.

For a publish message, there are 3 categories of things that would be given an std::shared_ptr<char[]>

  1. Topic
  2. Message
  3. Properties.

But not all properties would involve allocating a std::shared_ptr<char[]>;

Only the properties that have variable length contents, such as ContentType, UserProperty, CorrelationID, and so on, would have a std::shared_ptr<char[]> created.

For Connect messages we also have

  1. Username
  2. Password

What motivates me to discuss this is

  1. Copying is expensive
  2. Allocations are expensive (and can block)

Copying:

We mitigate copying being an expensive operation by ensuring that data enters the program (from boost::asio) directly into the one and only buffer that it will ever need to live in. Having the buffer stored as a std::shared_ptr<char[]> allows for access to the buffer to be shared among multiple data structures cheaply.

For example, we store topics in test_broker.hpp in multiple places, and those topics are stored for potentially a long time.

Messages and properties are stored in test_broker.hpp if they are retained, or as part of a will.

All of the data that's passed to a user program (server, client, whatever) can potentially be used for a long time as well, and allocating storage for them in this way allows for the program using mqtt_cpp to never need to copy the data, only pass the shared_ptr<> to the data around, which is significantly less expensive than copying large multi megabyte strings.

(The max size of a topic is 65536 bytes (64KB) and the max message size is 268,435,456 (256MB) )

Copying data that approach those sizes is very cpu intensive.


Allocations:

By having endpoint accept as one of it's class template, and constructor parameters, an allocator (defaulting to std::allocator, of course), and then allocating all memory allocations through the provided allocator, end user code can achieve better control over the allocation behavior of the program.

As I mentioned in my initial post: A good allocator design in this situation is to always return regions of memory in powers of two (Probably no smaller than 32bytes). You just round up to the nearest power of two any time user code requests an allocation.

Further, any time you need to allocate from the OS, you allocate many multiples of the requested size. E.g., if the requested allocation is X bytes, you round X to the next power of two, and then allocate 10x the result, which is stored in your memory pool as 9 chunks saved for later, and 1 chunk returned to the code requesting the allocation.

Finally, when your std::shared_ptr<char[]> is destructed, the custom deleter you provide to std::shared_ptr<char[]> returns the freed memory back to your allocator, instead of to the global allocator. You do this by calling std::allocate_shared, instead of std::make_shared. The memory returned like that becomes available for use in the next message processed.

But mqtt_cpp doesn't need to know anything about how the provided allocator behaves. All it needs to do is use it to allocate the storage for the memory buffers used to hold the various chunks of messages received from the network.


The way I would control the allocation behavior in my broker is like this:

1) One or more boost asio threads running mqtt_cpp::endpoints 2) One management thread (either running on boost::asio, or separate)

A single allocator would be provided to all mqtt_cpp::endpoints, and the broker.

When the allocator reaches a pre-specified low-water mark, a signal would be sent to the management thread indicating that more buffers are needed. The management thread would then allocate more memory, and feed the new buffers to the allocator. The reason this would be done on the management thread is to avoid waiting on the system allocator on the main processing thread.

If an mqtt_cpp::endpoint requests memory from the allocator and the allocator doesn't have enough available, the allocator does an allocation from the system immediately, since continuing processing is more important than waiting on another thread to provide memory.

Eventually the system would reach a steady state, where no allocations come from the system allocator, and would instead be served from the pooled buffers in the custom allocator.


What we need to do in order to support this is to

1) Modify mqtt_cpp::endpoint to use a custom allocator for all allocations (including containers from the standard library) 2) Modify mqtt_cpp::endpoint so that every variable-length part of a message is received directly into an std::shared_ptr<char[]>, so that user level code doesn't need to make copies, but instead just shares pointers.

redboltz commented 5 years ago

I think that basically this approach is good. Let me clarify that I understand the concept correctly. And I have some comments.

  1. Use std::shared_ptr<char[]> for string/binary types from the initial point. The initial point means before async_read() call, create std::shared_ptr<char[]> and then pass it toasync_read(). std::shared_ptr<char[]> work well with Boost.Asio async APIs. However, AFAIK, std::shared_ptr<char[]> is C++17 feature. boost::shared_ptr supports it. See https://www.boost.org/doc/libs/1_70_0/libs/smart_ptr/doc/html/smart_ptr.html#shared_array So, we might need to use mqtt::shared_ptr pattern.

  2. How do we know allocated size via allocated object? I think that there is no way to get size from std::shared_ptr<char[]>. Why we don't use std::shared_ptr<vector<char>> or std::shared_ptr<std::string>? I guess that minimize times of memory allocation. But I think that std::shared_ptr<std::string> is acceptable choice because it is supported in C++14 and easy to get allocated size. In addition, std::shared_ptr<char[]> cannot work with std::make_shared. But std::shared_ptr<std::string> can do. See https://wandbox.org/permlink/3adDc4YrqQwK4nAr I think that memory allocation times is even as std::shared_ptr<char[]>. What do you think?

  3. I guess that we create std::shared_ptr<char[]> for string/binary objects after we know their size. There are two types of length expression in MQTT. One is up to 4byte variable length, the other is fixed 2 bytes. For the former one, read one byte each async_read() until the finish indicator is detected. For the latter one, prepare 2 bytes member variables for size something like, char len_buf_[2] (similar to fixedheader). And call async_read() for two bytes. Then we know the size, and allocate the shared_ptr<char[]> with known size. Is that right?

jonesmz commented 5 years ago
  1. Yes, we'll read the size from the packet, and then allocate space, and then pass the allocated buffer directly to boost::asio.

I didn't know that shared_ptr to array type was C++17. It looks like there's a way to do this without using boost::shared_ptr, but I think boost::shared_ptr will support "allocate_shared" better, and that's probably important. So the mqtt::shared_ptr approach is reasonable.

https://stackoverflow.com/questions/13061979/shared-ptr-to-an-array-should-it-be-used

  1. Having too many levels of pointers can also cause performance problems. I don't want to have a shared_ptr -> string -> actual data. Instead it would be nice to have only shared_ptr -> actual data.

How about something like this? (Not a complete implementation...) https://pastebin.com/USspNpSs

  1. Yes, exactly.
redboltz commented 5 years ago
  1. It seems that the combination clang++ and libc++ doesn't work even if C++17. But boost works fine. mqtt::shared_ptr approach is fine for me. See https://wandbox.org/permlink/eiWC4ZPuzxDO1WkS

  2. I think that

struct size_sp { 
    mqtt::string_view as_string_view() const {
        std::string_view(size, data.get());
    }
    std::size_t size;
    mqtt::shared_ptr<char[]> data;
};

is good enough. What do you think?

jonesmz commented 5 years ago

1) Sure 2) Sure, that's sufficient. My version is a bit more user friendly though :-) I don't have a strong preference. Simple is fine.

BTW, It's important that the data is const after it's read out of boost::asio. If we dont, then users might edit the data in a way that isn't thread safe.

redboltz commented 5 years ago

I tried yet another implementation of 2.

See https://wandbox.org/permlink/duR5ze8iicTJms9X

What do you think?

jonesmz commented 5 years ago

Yea that looks like a good approach. Lets use that!

jonesmz commented 5 years ago

Though, I think for the constructor, it would be wise to do something like:

    buffer(mqtt::shared_ptr<const char[]> ptr, size_t size)
        : mqtt::string_view size_(ptr.get(), size)
        , ptr_(std::move(ptr))
        , size_(size)
    { }

We don't want the buffer class to ever have a writable buffer. It's basically a read only "view" class. Passing in a shared_ptr<const char[]> (instead of having the buffer class allocate it's own) guarantees that immutability property.

We could actually extend this a bit to have the buffer class contain not only a size, but also an offset. E.g. maybe someone only wants a portion of the buffer, after we've read it from boost::asio. If we can let them provide an offset / start position, then this buffer class can represent any arbitrary range inside of the buffer owned by ptr_.

Imagine a message that is comma separated values, for example. User code may want to grab the 3rd value out of the comma separated values, but that value is 10MB out of a 200MB string. If we support an offset indicator, the user can make a read-only substring that's attached to the same shared_ptr<const char[]> as the buffer that mqtt_cpp::endpoint provided to the callback, but which only represents the specific part of the string that they want.

jonesmz commented 5 years ago

Ah, actually, now that I think about it. We maybe shouldn't inherit from mqtt::string_view.

There is a lot of code out there that accepts std::string_view by value and not by reference. Inheriting from mqtt::string_view like this will cause slicing of the object. The std::shared_ptr<const char[]> won't be passed to the function, only the mqtt::string_view that's being inherited from.

If we use private inheritance to hide the fact that mqtt::buffer (or whatever we name it) is an mqtt::string_view, then all of those functions won't be called automatically.

But then again, that's kind of the whole point of mqtt::string_view, that it doesn't have the ownership information. So, it's probably totally fine.

redboltz commented 5 years ago

Could you show me actual working code ? It is easy to understand for me.

You can edit https://wandbox.org/permlink/duR5ze8iicTJms9X and then click run and share button, then update the permalink.

jonesmz commented 5 years ago

https://wandbox.org/permlink/8ohhfDwKHernxJhT


#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>

namespace as = boost::asio;

namespace mqtt
{

class buffer : public std::string_view {
public:
    buffer(std::shared_ptr<const char[]> ptr, std::size_t const length, std::size_t const offset = 0)
     : std::string_view(ptr.get()+offset, length-offset)
     , ptr_(std::move(ptr))
     , length_(length)
     , offset_(offset)
    {
      // If offset > size, throw exception.
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, offset_+offset, length);
    }

public:
    std::shared_ptr<const char[]> ptr_;
    std::size_t length_;
    std::size_t offset_;
};

} // namespace mqtt

int main() {
    // Common io_service
    as::io_service ios;

    // Common Config
    constexpr std::uint16_t port = 12345;

    // Server
    as::ip::tcp::acceptor ac(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port));
    as::ip::tcp::socket ss(ios);
    std::function<void()> do_accept;
    std::vector<as::ip::tcp::socket> connections;

    do_accept = [&] {
        ac.async_accept(
            ss,
            [&]
            (boost::system::error_code const& e) {
                std::cout << e.message() << std::endl;
                // read size of topic from boost::asio. Assume 5 bytes.
                std::size_t topicLen = 5;
                // Allocate space to read the topic into.
                std::shared_ptr<char[]> topicBuf = std::make_shared<char[]>(topicLen);
                as::async_read(
                    ss,
                    as::buffer(topicBuf.get(), topicLen),
                    [&, topicBuf, topicLen](boost::system::error_code const& e, std::size_t) {
                        std::cout << e.message() << std::endl;
                        std::cout << mqtt::buffer(topicBuf, topicLen) << std::endl;

                        // read size of payload from boost::asio. Assume 7 bytes.
                        std::size_t payloadLen = 7;
                        // Allocate space to read the topic into.
                        std::shared_ptr<char[]> payloadBuf = std::make_shared<char[]>(payloadLen);
                        as::async_read(
                            ss,
                            as::buffer(payloadBuf.get(), payloadLen),
                            [payloadBuf, payloadLen](boost::system::error_code const& e, std::size_t) {
                                std::cout << e.message() << std::endl;
                                std::cout << mqtt::buffer(payloadBuf, payloadLen) << std::endl;
                            }
                        );
                    }
                );
            }
        );
                };
    do_accept();

    // Client
    as::ip::address address = boost::asio::ip::address::from_string("127.0.0.1");
    as::ip::tcp::endpoint ep(address, port);
    as::ip::tcp::socket cs(ios);
    cs.async_connect(
        ep,
        [&]
        (boost::system::error_code const& e) {
            std::cout << e.message() << std::endl;
            as::write(cs, as::buffer(std::string("topicpayload")));
        }
    );

    // Start
    ios.run();
}
redboltz commented 5 years ago

Thank you!

I think that this approach is very good.

  1. User can access buffer as read only.
  2. Zero copy.
  3. Simple.
  4. Offset functionality is useful.

Keeping original length and modify the copy of buffer as follows:

     : std::string_view(ptr.get()+offset, length-offset)

It's so elegant!

jonesmz commented 5 years ago

I think there is a bug in my constructor.

std::string_view(ptr.get()+offset, length-offset)

Should be std::string_view(ptr.get()+offset, length)

Because length is the length of the "string", not the length of the whole buffer.

redboltz commented 5 years ago

I updated your example.

See https://wandbox.org/permlink/v2ktJq5GsVeGdlbl

I think that your original constructor is right. But the arguments order at buffer() calling in substr() seems wrong.

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, offset_+offset, length);
    }

should be

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, length_, offset);
    }
redboltz commented 5 years ago

Still something wrong...

redboltz commented 5 years ago

Finally, I understand your constructor fix is right. In addition substr implementation should be fixed. I added updated code.

First, I wrote a test for buffer:

https://wandbox.org/permlink/YgLC8uTB6lKbq625

Then updated the code:

https://wandbox.org/permlink/2n19bFiohTsBymbY

It works as I expedted.

class buffer : public std::string_view {
public:
    buffer(std::shared_ptr<const char[]> ptr, std::size_t const length, std::size_t const offset = 0)
     : std::string_view(ptr.get()+offset, length)
     , ptr_(std::move(ptr))
     , length_(length)
     , offset_(offset)
    {
      // If offset > size, throw exception.
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        // if offset and length result in going out of bounds, throw an exception.
        return buffer(ptr_, length, offset_+offset);
    }

public:
    std::shared_ptr<const char[]> ptr_;
    std::size_t length_;
    std::size_t offset_;
};
redboltz commented 5 years ago

In order to correct out of range checking, I updated the buffer as follows: https://wandbox.org/permlink/WxNZ9s2Wf27RXlDr

offset_ is no longer needed. It is covered by string_view.

But I'd like to confirm the purpose of offset.

If offset is only a convenience for user reading the buffer, then offset_ isn't needed.

If offset is for endpoint.hpp developer (us), it might be needed. I mean allocate big memory such as all of remaining_length, and give a part of the memory to boost::async_read(). I think that we don't use such approach. I think that we allocate the memory for individual variable length binary/string such as ClientId, TopicFilter, Payload, Properties(string/binary type) and so on.

What do you think?

redboltz commented 5 years ago

The basic concept of my approach is member variables are always keep the initial values. And mqtt::string_view provides various view for users.

jonesmz commented 5 years ago

Offset would not be used by endpoint.jpp.

Actually, we don't even need to store length (aside from informing string_view) because shared_ptr knows the size of the allocation internally, no need to carry that around on our own.

jonesmz commented 5 years ago

How about this?

class buffer : public mqtt::string_view {
public:
    buffer(mqtt::shared_ptr_const_array ptr, std::size_t length)
        : mqtt::string_view(ptr.get(), length),
          ptr_(std::move(ptr))
    {
    }
    buffer(mqtt::shared_ptr_const_array ptr, mqtt::string_view view)
        : mqtt::string_view(view),
          ptr_(std::move(ptr))
    {
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        return buffer(ptr_, mqtt::string_view::substr(offset, length));
    }

private:
    mqtt::shared_ptr_const_array ptr_;
};

We could make buffer(mqtt::shared_ptr_const_array ptr, mqtt::string_view view) private to avoid mis-use, but I don't think there's any reason to do so.

There's no substantial reason that mqtt::buffer's shared_ptr needs to be what the provided string_view points to. It would be really strange if it wasn't, but any situation where mqtt::buffer converts to mqtt::string_view has no ownership semantics anyway, and there it doesn't matter what buffer the mqtt::string_view points to.

So we don't need to force users of the code to construct the mqtt::buffer as a string_view to the entire buffer if they really don't want to.

I'd say we can go even further and remove buffer(mqtt::shared_ptr_const_array ptr, std::size_t length), but you might prefer the simpler constructor.

jonesmz commented 5 years ago

I suppose that we could also technically do this:

class buffer : public mqtt::shared_ptr_const_array, public mqtt::string_view {
public:
    buffer(mqtt::shared_ptr_const_array ptr, std::size_t length)
        : mqtt::shared_ptr_const_array(std::move(ptr))
        , mqtt::string_view(mqtt::shared_ptr_const_array::get(), length)
    {
    }

    buffer(mqtt::shared_ptr_const_array ptr, mqtt::string_view view)
        : mqtt::shared_ptr_const_array(std::move(ptr))
        , mqtt::string_view(view)
    {
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        return buffer(*this, mqtt::string_view::substr(offset, length));
    }
};

template<class E, class T>
std::basic_ostream<E, T> & operator<< (std::basic_ostream<E, T> & os, buffer const & p)
{
    os << static_cast<mqtt::string_view const&>(p);
    return os;
}

Not saying that we should. Just that I think that would work.

Doing this provides user level code direct access to member functions of std::shared_ptr. Things like .get(), .reset(), and so on.

jonesmz commented 5 years ago

https://wandbox.org/permlink/IpUdPFjObfiCdKsf

redboltz commented 5 years ago

I'm still thinking about the comment: https://github.com/redboltz/mqtt_cpp/issues/320#issuecomment-511140614

This is a response to below 2 comments: https://github.com/redboltz/mqtt_cpp/issues/320#issuecomment-511141317 https://github.com/redboltz/mqtt_cpp/issues/320#issuecomment-511141513

Doing this provides user level code direct access to member functions of std::shared_ptr. Things like .get(), .reset(), and so on.

I don't understand above yet.

Consider the publish_handler.

Current code is

    using v5_publish_handler = std::function<
        bool(std::uint8_t fixed_header,
             mqtt::optional<packet_id_t> packet_id,
             mqtt::string_view topic_name,
             mqtt::string_view contents,
             std::vector<v5::property_variant> props)
    >;

and our idea before the comment is

    using v5_publish_handler = std::function<
        bool(std::uint8_t fixed_header,
             mqtt::optional<packet_id_t> packet_id,
             mqtt::buffer topic_name,
             mqtt::buffer contents,
             std::vector<v5::property_variant> props) // props contains mqtt::buffer
    >;

so far, so good.

But if we removed shared_ptr from mqtt:buffer, then we need to provide shared_ptr to users as follows:

    using v5_publish_handler = std::function<
        bool(std::uint8_t fixed_header,
             mqtt::optional<packet_id_t> packet_id,
             mqtt::buffer topic_name,
             mqtt::shared_ptr_const_array sp_topic_name,
             mqtt::buffer contents,
             mqtt::shared_ptr_const_array sp_topic_name,
             std::vector<v5::property_variant> props, // props contains mqtt::buffer
             std::vector<mqtt::shared_ptr_const_array> sp_props)
    >;

because when we implement broker, publish message is copied to all subscribers, and in order to avoid copy, we need mqtt::shared_ptr_const_array.

Am I missing something?

redboltz commented 5 years ago

Ah, I looked over this multiple inheritance. Never mind my comment above.

class buffer : public mqtt::shared_ptr_const_array, public mqtt::string_view {
redboltz commented 5 years ago

Doing this provides user level code direct access to member functions of std::shared_ptr. Things like .get(), .reset(), and so on.

I'm not sure it is worth to have. But I don't strongly disagree about that. It might help something unpredictable (at least for me currently) use case.

However, as you add the following operator,

template<class E, class T>
std::basic_ostream<E, T> & operator<< (std::basic_ostream<E, T> & os, buffer const & p)
{
    os << static_cast<mqtt::string_view const&>(p);
    return os;
}

some of operator should be provided.

Consider operator==.

    auto b = mqtt::buffer(sp, 8);
    b == b;

If we insert above comparison, a compile error is occurred. I think that ordinary users expects string_view comparison.

But advanced user might want to compare original pointer. It is the same as the ownership as long as we on't use shared_ptr's aliasing constructor https://en.cppreference.com/w/cpp/memory/shared_ptr/shared_ptr (8) We have no plan to use it.

So I think that mqtt::shared_ptr_const_array as a member variable is better than inherits from that.

buffer could have a getter for ptr_ e.g. get_shared_ptr_const_array() const { return ptr_; }.

redboltz commented 5 years ago

See https://wandbox.org/permlink/QzDwIvIuqmqyZN6F

I added back length check and ownership comparison.

The name get_shared_ptr_const_array() might depend on the type information too much. get_underlying_shared_ptr() could be better.

redboltz commented 5 years ago

BTW, if you don't mind, could you tell me where do you live? I'd like to know your timezone to communicate smoother. I live in Tokto, Japan. My timezone is JST.

jonesmz commented 5 years ago

I'm in Chicago. United States. (CST)

redboltz commented 5 years ago

I considered again. I think that the following constructor should be private to avoid misuse. substr() call it with border check but the direct call from user skip the checking.

    buffer(buffer const& other, mqtt::string_view view)
        : mqtt::string_view(view),
          ptr_(other.ptr_) {
    }

What do you think?

jonesmz commented 5 years ago

Sure, hiding that constructor sounds fine.

We could have the shared_ptr inheritance be private, and then only forward the specific functions that end user code might want.

Or we could completely hide the shared pointer. I guess I don't have a strong opinion on it.

redboltz commented 5 years ago

I started implementing.

redboltz commented 5 years ago

How to treat well sending case?

For example, the class will becomes

class will {
public:
    /**
     * @brief constructor
     * @param topic
     *        A topic name to publish as a will
     * @param message
     *        The contents to publish as a will
     * @param retain
     *        A retain flag. If set it to true, the contents is retained.<BR>
     *        See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038<BR>
     *        3.3.1.3 RETAIN
     * @param qos
     *        mqtt::qos
     */
    will(buffer topic,
         buffer message,
         bool retain,
         std::uint8_t qos,
         std::vector<v5::property_variant> props = {})
        :topic_(std::move(topic)),
         message_(std::move(message)),
         retain_(retain),
         qos_(qos),
         props_(std::move(props))
    {}

    /**
     * @brief constructor (QoS0)
     * @param topic
     *        A topic name to publish as a will
     * @param message
     *        The contents to publish as a will
     * @param retain
     *        A retain flag. If set it to true, the contents is retained.<BR>
     *        See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718038<BR>
     *        3.3.1.3 RETAIN
     */
    will(buffer topic,
         buffer message,
         bool retain = false,
         std::vector<v5::property_variant> props = {})
        :will(std::move(topic), std::move(message), retain, qos::at_most_once, std::move(props))
    {}

    /**
     * @brief constructor (retain = false)
     * @param topic
     *        A topic name to publish as a will
     * @param message
     *        The contents to publish as a will
     * @param qos
     *        mqtt::qos
     */
    will(buffer topic,
         buffer message,
         std::uint8_t qos,
         std::vector<v5::property_variant> props = {})
        :will(std::move(topic), std::move(message), false, qos, std::move(props))
    {}
    buffer const& topic() const {
        return topic_;
    }
    buffer& topic() {
        return topic_;
    }
    buffer const& message() const {
        return message_;
    }
    buffer& message() {
        return message_;
    }
    bool retain() const {
        return retain_;
    }
    std::uint8_t qos() const {
        return qos_;
    }
    std::vector<v5::property_variant> const& props() const {
        return props_;
    }
    std::vector<v5::property_variant>& props() {
        return props_;
    }

private:
    buffer topic_;
    buffer message_;
    bool retain_ = false;
    std::uint8_t qos_ = 0;
    std::vector<v5::property_variant> props_;
};

It works fine when CONNECT packet receiving case.

When a user want to create will message from scratch, user should be able to use the same will class. It is simple.

But how to create buffer topic, for example?

I think some helper function is required.

buffer make_buffer(mqtt::string_view v) {
    shared_ptr_array spa { new char[v.size()] };
    std::memcpy(spa.get(), v.data(), v.size());
    return buffer(std::move(spa), v.size());    
}

https://wandbox.org/permlink/yz8l2KGtVpriM49E

See the last line of the main().

What do you think?

redboltz commented 5 years ago

Related topic.

If buffer has manage their own lifetime, life_keeper is no longer needed?

    packet_id_t async_publish(
        as::const_buffer topic_name,
        as::const_buffer contents,
        mqtt::any life_keeper,
        std::uint8_t qos = qos::at_most_once,
        bool retain = false,
        async_handler_t func = async_handler_t()
    ):

could become

    packet_id_t async_publish(
        buffer topic_name,
        buffer contents,
        std::uint8_t qos = qos::at_most_once,
        bool retain = false,
        async_handler_t func = async_handler_t()
    ):

Implementing broker, buffer is good enough, I think. How about normal client application?

Can we move to buffer based sending API?

cons: sync qos0 publish, subscribe APIs can't avoid buffer allocation.

redboltz commented 5 years ago

Ah, sync APIs without store only accept string_view. It doesn’t require allocating memory. Also we can pass a buffer directly. It is implicitly sliced.

For storing things on sync API, such as qos 1,2 publish, and will, user creates buffer and passes it, no life keeper is required . For all async APIs, user creates buffer. It is the only once memory allocation. I think that it is a good approach.

What do you think?

jonesmz commented 5 years ago

How to treat will sending case?

I'm not really sure that we should be making mqtt::will as part of the sending API directly. I know that it's one of the arguments that can be passed to the connect and async_connect functions, but I'm wondering if it would instead be better to just provide overloads for those functions that accept additional arguments.

Anyway, if we're going to have mqtt::will as a parameter to connect and async_connect, then it would be nice to support doing it as a zero-copy API as well.

The problem with requiring the user use mqtt::buffer when sending a will is that this implies that creating the mqtt::buffer will require allocating and copying.

The ideal situation is that the user will be able manage the lifetime of their data if they want to, or have us manage the lifetime of their data if they don't.

mqtt::buffer is basically a "view to the data" plus a lifetime management variable.

If we change mqtt::buffer from std::shared_ptr<const char[]> to simply "mqtt::any", that would allow the end-user to provide us with whatever lifetime management object they want. If they provide us an std::string, we can "move" that std::string into a std::shared_ptr. If they provide us with an std::shared_ptr, that'll work as well.

That would looks like:

class buffer : public mqtt::string_view {
public:
    explicit buffer(mqtt::string_view view, mqtt::any lifetime = mqtt::any())
        : mqtt::string_view(view),
          lifetime_(std::move(lifetime))
    {
    }

    buffer substr(std::size_t offset, std::size_t length)
    {
        return buffer(lifetime_, mqtt::string_view::substr(offset, length));
    }

private:
    mqtt::any lifetime_;
};
jonesmz commented 5 years ago

If buffer has manage their own lifetime, life_keeper is no longer needed?

No, I don't think this will work with std::shared_ptr<const char[]>, since it would require the end user to manage their data in terms of std::shared_ptr<const char[]>

If we used mqtt::any instead of std::shared_ptr<const char[]>, then we could do that, I suppose.

In the current code, for the async_* functions, we already automatically convert std::string -> std::shared_ptr, which is then passed to the version of the function that accepts as::buffer and life_keeper.

If we replaced that version of the function with mqtt::buffer (aka, std::string_view, and std::any), internally we would still end up converting to the as::buffer and mqtt::any version of things anyway.

So it doesn't seem like it would do any good, in my opinion.

Another reason not to do that is that the end-user might have a single object that stores the actual data for all of the variables passed to the async_* function. If we only accepted mqtt::buffer, they'd have to provide multiple copies of the lifetime keeping object.

I think the design should be that when accepting data from the user, we only accept one lifekeeper, which gives the end user the most flexibility in how to manage their data. Either they have one thing that needs to be keep-alived, or they have multiple and they give them to use as an std::tuple. If we instead took multiple life-keepers, the end user would need to do weird things to make that work properly.

redboltz commented 5 years ago

Thank you for the comments.

I will keep current sending code. It can work well with buffer.

Another reason not to do that is that the end-user might have a single object that stores the actual data for all of the variables passed to the async_* function. If we only accepted mqtt::buffer, they'd have to provide multiple copies of the lifetime keeping object.

I agree with you.

I remember you mentioned about that the receiving case.

  1. This means that each message contents, username, password, v5::property, and so on are written into their own buffer. We'll need to have each buffer be contiguous so that higher level classes like string_view and stuff can refer to them directly, but we should be able to keep memory fragmentation down by setting a minimum buffer size of e.g. 256, and always retrieve buffers as multiples of twos.

It is possible but pretty difficult to implement I pushed https://github.com/redboltz/mqtt_cpp/blob/impl_320/include/mqtt/endpoint.hpp#L9405 for discussion. It is incomplete code. It is straight forward implementation using separate allocation. It becomes callback hell. Perhaps we can avoid it using future/promise but std::future is low performance. It create local thread as far as I know. Boost.Asio has their own future experimental support. But I'm not sure WebSocket Boost.Beast support it. At lease when the Beast was not part of the boost, it didn't support future. I also considered coroutine but I don't think it isn't product level yet.

How to switch separate allocation pattern or all at once allocation pattern? I think that something like this.

  1. Read until remaining length as existing way.
  2. Compare remaining length with threshold value e.g.256
  3. If the remaining length is less than 256, do allocate all at once process, otherwise allocate each element process.

The CONNECT packet is the most complex message in MQTT. So I chose it as the start point to find issues.

So far, I implemented a part of CONNECT receiving process using simple callback (hell) way.

What do you think?

redboltz commented 5 years ago

I don't like callback hell but if there is no good solution, I will choose the callback hell. It's my current opinion. I think that key points are 1.performance(speed), 2.WebSocket support, 3. avoid code duplication with allocate all at once version.

jonesmz commented 5 years ago

I think we can implement this using a finite state machine.

enum process_connect_phase
{
    phase_start, phase_header, phase_username, phase_username, phase_password, phase_password, phase_will, phase_end
}
struct ConnectData{ // flags, mqtt::optional<mqtt::buffer> username, mqtt::optional<mqtt::buffer> password, mqtt::optional<will>, mqtt::v5::properties };
void process_connect(async_handler_t func, process_connect_phase phase, ConnectData data = {})
{
    switch(phase)
    {
        case phase_start: // do any initialization here
        case phase_header:  boost::asio::async_read(..., [&](...) {
                                                        // verify that the header is valid.
                                                        // store flags in ConnectData
                                                        process_connect(std::move(func), phase_username, std::move(data));
                                                    });
        case phase_username: boost::asio::async(..., [&](...) {
                                                    // read username length
                                                    async_read_string(length, [](mqtt::buffer username) {
                                                        data.username = std::move(username);
                                                        process_connect(std::move(func), process_password, std::move(data));
                                                     });
                                                 });
        case will: // Call dedicated function that reads Wills and then calls a callback with the result.
                   // The callback will put the data retrieved into the ConnectData structure, and then
                   // call process_connect for the next phase.
        case properties: // Call dedicated function that reads properties and then calls a callback with the result.
                         // The callback will put the data retrieved into the ConnectData structure and then
                         // call process_connect for the next phase.
        case end: // any cleanup happens here
                  // and finally the end user's callback gets called with the contents of ConnectData.
    }
}
redboltz commented 5 years ago

Thank you for the advice. I think it is a good approach too. Incremental receiving process is a kind of skip-able sequence. So switch-case based state management is good enough.

In addition, it could help avoiding code duplication. If the message is small enough to receive all at once, we can call something like this ios.post([] { process_connect(..., string_view) });. The string_view is sliding (shifting).

I restart implementing by this way.

redboltz commented 5 years ago

PoC code https://wandbox.org/permlink/PjJkUfu1H2inOCut

jonesmz commented 5 years ago

In addition, it could help avoiding code duplication. If the message is small enough to receive all at once, we can call something like this ios.post([] { process_connect(..., string_view) });. The string_view is sliding (shifting).

How would we detect that the message is small enough to receive all at once?

Is there some function that can be called from boost::asio to ask how much data is available to read immediately?

The way I envisioned this working, is that each packet type has a known structure. So for packets like the connect packet, we would read the packet in this way:

  1. async_read the fixed header
  2. async_read the length of the username
  3. allocate space for username, async_read the username
  4. async_read the length of the password
  5. allocate space for password, async_read the password
jonesmz commented 5 years ago

It's unfortunate that the mqtt protocol doesn't list all of the length values at the beginning of the packet. Since that would allow us to call boost::asio::async_read with multiple different buffers all at once.

But there's no way to do that with the mqtt protocol :(

redboltz commented 5 years ago

How about remaining length ? https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024

We need at most 5 times 1 byte read in the first phase. And then mqtt_cpp store it as a member variable remaining_length. It is expressed as size in my PoC code.

If size is less than lim then choose bulk read mode. Otherwise chunk read mode. In bulk read mode, all payload https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901030 is written on the one shared_ptr buffer. Each element such as topic_filter cab ve read via string_view using buffer mechanism.

If size is not less that lim then chunk read mode. That means read MQTT element that has small size such as first 10 bytes of CONNECT message payload using internal (member variable) buffer. If the reading process find variable length element such as string https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901010 then, read 2 bytes using internal buffer and get the string length. It is up to 65535 byte. Then allocate shared_ptr and call async_read. When async_read handler is called, call the parse function. It is expressed as step1 to step3 in my PoC code.The parsing functions are shared by bulk read mode and chunk read mode. In chunk read mode, after parsing , we get some information from the parse result. And then store them somewhere, I use tuple in my PoC code, and choose the next state depend on the result, then call the state machine function.

In chunk read mode, some case could be inefficient. For example, in SUBSCRIBE message, https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901168 the payload contains many topic filter entries and each of them are very small but the last one has 64K bytes. It is reflected to remaining length so the chunk read mode is used. As the result of that, shared_ptr is allocated for each topic filter entries. So far, I accept this inefficiency. I guess that it is rare case. Because the length of topic filters are not so different in my experience.

However, same thing could happen properties. It has more various length. Fortunately, before each properties, we can read property length field https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901028. It is the same format as remaining length. If it is small enough, we can allocate all properties on one shared_ptr. That is recursive pattern of MQTT message.

It is still just idea. I will write more PoC code and share with you.

redboltz commented 5 years ago

My answer is

How would we detect that the message is small enough to receive all at once?

It is remaining length. In addition we can use property length.

Is there some function that can be called from boost::asio to ask how much data is available to read immediately?

I don't understand why we need to know that. The version of async_read() I'm using is not call callback handler until expected bytes are received. So we don't need to care about read immediately. If the rest of message is not received, just waiting. But asynchronously waiting. So mqtt_cpp can do other things while waiting.

jonesmz commented 5 years ago

How about remaining length ? https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024

Perfect!

I understand now.

If the total message size is below $limit, as specified by end-user code, then read the entire message as a single allocation.

If the total message size is above $limit, then allocate each variable length object separately.

I don't understand why we need to know that. The version of async_read() I'm using is not call callback handler until expected bytes are received. So we don't need to care about read immediately. If the rest of message is not received, just waiting. But asynchronously waiting. So mqtt_cpp can do other things while waiting.

I didn't understand before.

It is now clear that there is no need to do this.

Using the "remaining_length" field of the packet fixed header will work the way you explained.

redboltz commented 5 years ago

I noticed that my PoC code "chunk read" and "buld read" print is upside down, sorry.

Anyways, I want to share a subtle design choice.

See

void handle_receive(as::io_context& ioc, std::size_t size, std::size_t lim, cb_t cb) {
    if (size < lim) {
        std::cout << "chunk read" << std::endl; // misprint, actually buld read
        auto buf = std::make_shared<std::string>("", size);
        ioc.post(
            [buf, cb = std::move(cb)] {
                // Let's say buf is fullfilled
                std::string_view v1(*buf);
                auto r1 = step1(v1);
                auto v2 = v1.substr(r1.consumed);
                auto r2 = step2(v2);
                auto v3 = v2.substr(r2.consumed);
                auto r3 = step3(v3);
                assert(r3.consumed == v3.size());
                cb(r1.result, r2.result1, r2.result2, r3.result);
            }
        );
    }
    else {
        std::cout << "bulk read" << std::endl;
        handle_receive_impl(ioc,  size, std::move(cb));
    }
}

I considered three choices here.

At first, I want to use callback based approach both bulk read and chunk read because code duplication is minimized. However, there is problem. From where the callback should be called on bulk read. In the previous handler? It is not good because callback is stacking. If there are many properties or topic_filters on subscribe, then stack overflow could happen. The next idea is call io_context::post(). The argument is lambda expression that calls statemachine function with the next phase. However, post() en-queues the last of the io_context queue. So the other coming message could interleaved. In order to avoid it, we can use priority queue. I used to consider https://stackoverflow.com/questions/43385897/how-to-combine-strand-wrapper-and-priority-wrapper-on-boost-asio However, it is too complicated and requires some special notation in the main loop, not just ioc.run(). It's difficult to use.

Finally, I chose simple return value based approach. Sharing code is limited as step1() to step3() in my PoC code. But I believe it is reasonable choice.

jonesmz commented 5 years ago

However, there is problem. From where the callback should be called on bulk read.

  1. In the previous handler? It is not good because callback is stacking. If there are many properties or topic_filters on subscribe, then stack overflow could happen.
  2. The next idea is call io_context::post(). The argument is lambda expression that calls statemachine function with the next phase. However, post() en-queues the last of the io_context queue. So the other coming message could interleaved.
  3. In order to avoid it, we can use priority queue. I used to consider https://stackoverflow.com/questions/43385897/how-to-combine-strand-wrapper-and-priority-wrapper-on-boost-asio However, it is too complicated and requires some special notation in the main loop, not just ioc.run(). It's difficult to use.

I think 2. is the best.

In the chunk read model, we're always using callbacks, so no matter what we need to have the various infrastructure to support the callbacks.

At this link, the documentation of async-read says https://www.boost.org/doc/libs/1_65_0/doc/html/boost_asio/reference/async_read/overload1.html

The function call always returns immediately.

Which implies that the function call does no reading of data from the stream until after it returns, before it calls the provided callback.

This should mean that the operation for asynchronously reading the data is being added to the end of the io_context's queue, just like any other operation that uses io_context::post().

So the other coming message could interleaved.

I don't believe that this is possible. boost::asio tcp sockets are "streams" of data. So if any new data arrives before we've finished reading from the stream, the data that has arrived will be en-queued to the back of the queue. The data that we are currently reading will stay in place.

As long as we don't have any of the following:

  1. Multiple calls to boost::asio::async_read at the same time, on any number of threads
  2. A call to boost::asio::async_read and also boost::asio::read (The synchronous version) happening at the same time, on any number of threads,
  3. Multiple calls to boost::asio::read (The synchronous version) happening on two or more threads at the same time

Then I believe we are guaranteed to have the data read from the stream in order, with no possibility of messages overlapping.