redboltz / mqtt_cpp

Boost Software License 1.0
434 stars 107 forks source link

server.hpp, multiple tcp connects waiting #179

Closed jonesmz closed 5 years ago

jonesmz commented 5 years ago
    void do_accept() {
        if (close_request_) return;
        acceptor_.async_accept(
            socket_->lowest_layer(),
            [this]
            (boost::system::error_code const& ec) {
                if (ec) {
                    boost::system::error_code close_ec;
                    acceptor_.close(close_ec);
                    if (h_error_) h_error_(ec);
                    return;
                }
                auto sp = std::make_shared<endpoint_t>(std::move(socket_));
                if (h_accept_) h_accept_(*sp);
                renew_socket();
                do_accept();
            }
        );
    }

In this function, there's a small gap between when an incoming tcp connection is accepted, and the next socket is initialized.

I believe that async_accept is able to handle multiple sockets at once, and only one of them will be selected per incoming tcp connection.

One way to approach supporting multiple simultaneous incoming TCP connections is to have a std::array<std::unique_ptr, N> (where N can be supplied as a template parameter to server, but defaults to some small number, like 5.)

do_accept's current logic gets moved to do_accept_impl(), and do_accept calls do_accept_impl() in a for-loop, once per std::array index.

Then for each call to accept_async, we provide a different index number into the std::array, which is then passed to the subsequent call to do_accept_impl().

redboltz commented 5 years ago

I think that if we apply the approach to the tcp server, other servers ( tcp tls, ws, and ws tls ) should be the same. TLS servers (tls, ws tls) have ssl::context as the parameter of the constractor. That means it is provided from the outside. And ssl::stream has the reference of the ssl::context. https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/reference/ssl__stream/stream.html

I think that if each sockets (ssl streams) access the same one ssl::context sanctimoniously, it may cause problems such as undefined behavior. User can provide the array of (initialized) ssl::context, but it is subtle complected interface. If the default value of the array is 1 (not 5), and provide two versions of constructors (ssl::context, and a collection of ssl::context), I think that it is acceptable.

Any ideas?

jonesmz commented 5 years ago

It's not clear to me based on the ssl::context documentation (https://www.boost.org/doc/libs/1_65_0/doc/html/boost_asio/reference/ssl__context.html) that there's any restrictions on using the same ssl context for more than one simultaniously accepted socket.

The way that mqtt/server.hpp currently uses the ssl context is that it creates a new socket_t with the ssl context, and then does an async_accept on the acceptor with that socket. But as soon as the connection is accepted, a new socket is created with the same context, even if the SSL handshake, or subsequent secure transport operations haven't yet been finished on the original socket.

The documentation on the sslstream also says that it's thread safe, as long as the operations happen on the same strand. https://www.boost.org/doc/libs/1_65_0/doc/html/boost_asio/reference/ssl__stream.html Is that currently the case?

So I think that it's not necessary to have more than one ssl context, unless only the handshake uses the ssl context (which I don't think is the case, but perhaps I'm mistaken).

redboltz commented 5 years ago

Before talking about multi threading, I'd like to show you what I understand. Even if single threading, multiple sockets with single acceptor::async_accept() would improve the performance because the server can start the next accepting sequence during the previous accepting sequence is processing. Is that right?

I don't read ssl::context source yet but I think that the name context implies it is stateful object (*1). Let's say there are 3 steps in the context. I'm afraid that the following scenario. It happens on single threading.

  1. ctx_ is shared by TLS socket s1 and s2.
  2. s1 accepts the tcp connection and then start TLS step1. s1 sets step1 to ctx_.
  3. ios.run() dispatches internal TLS socket event. It sets step2 to ctx_.
  4. s2 accepts the tcp connection and then start TLS step1. s2 sets step1 to ctx_. At this point, ctx_ is overwritten unexpectedly.

*1 It's just my guess. We need to confirm it.

redboltz commented 5 years ago

I think that if ssl::context is a set of read only configuration parameters for ssl::socket, then it can be shared.

redboltz commented 5 years ago

I wrote some POC code. It works as I expected.

TCP (no tls)

#include <iostream>
#include <vector>
#include <algorithm>
#include <cassert>
#include <memory>

#include <boost/asio.hpp>

namespace as = boost::asio;

int main() {
    constexpr std::uint16_t port = 12345;
    auto ip = as::ip::address::from_string("127.0.0.1");

    as::io_context ioc;

    using socket_up = std::unique_ptr<as::ip::tcp::socket>;
    // Server
    std::size_t const num_of_accepting_sockets = 1;

    as::ip::tcp::acceptor ac(ioc, as::ip::tcp::endpoint(as::ip::tcp::v4(), port));
    // multiple sockets for async_accept
    std::vector<socket_up> ss;
    ss.reserve(num_of_accepting_sockets);
    for (std::size_t i = 0; i != num_of_accepting_sockets; ++i) {
        ss.emplace_back(std::make_unique<as::ip::tcp::socket>(ioc));
    }

    std::function<void(as::ip::tcp::socket&)> do_accept;
    std::vector<socket_up> connections;

    do_accept =
        [&] (as::ip::tcp::socket& s){
            ac.async_accept(
                s,
                [&]
                (boost::system::error_code const& e) {
                    std::cout << e.message() << std::endl;
                    assert(!e);
                    std::cout << "server connected : socket = " << std::hex << &s << std::endl;

                    auto it = std::find_if(ss.begin(), ss.end(), [&](auto const& e) { return e.get() == &s; });
                    assert(it != ss.end());

                    connections.emplace_back(std::move(*it));

                    // renew the socket then accept again
                    *it = std::make_unique<as::ip::tcp::socket>(ioc);
                    std::cout << "renewed socket = " << std::hex << &**it << std::endl;
                    do_accept(**it);
                }
            );
        };

    for (auto const& s : ss) do_accept(*s);

    // Client
    as::ip::tcp::endpoint ep(ip, port);
    as::ip::tcp::socket cs1(ioc);
    cs1.async_connect(
        ep,
        [&]
        (boost::system::error_code const& e) {
            assert(!e);
            std::cout << "client1 connected" << std::endl;
        }
    );

    as::ip::tcp::socket cs2(ioc);
    cs2.async_connect(
        ep,
        [&]
        (boost::system::error_code const& e) {
            assert(!e);
            std::cout << "client2 connected" << std::endl;
        }
    );

    // Start
    ioc.run();
}

TCP (tls)

#include <iostream>
#include <vector>
#include <algorithm>
#include <cassert>
#include <memory>

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>

namespace as = boost::asio;

int main() {
    constexpr std::uint16_t port = 12345;
    auto ip = as::ip::address::from_string("127.0.0.1");

    as::io_context ioc;

    using socket_up = std::unique_ptr<as::ssl::stream<as::ip::tcp::socket>>;
    // Server
    std::size_t const num_of_accepting_sockets = 2;

    as::ip::tcp::acceptor ac(ioc, as::ip::tcp::endpoint(as::ip::tcp::v4(), port));

    // single context
    as::ssl::context ctx(as::ssl::context::tlsv12);
    ctx.set_verify_mode(as::ssl::verify_none);
    ctx.use_certificate_file("server.crt", as::ssl::context::pem);
    ctx.use_private_key_file("server.key", as::ssl::context::pem);

    // multiple sockets for async_accept
    std::vector<socket_up> ss;
    ss.reserve(num_of_accepting_sockets);
    for (std::size_t i = 0; i != num_of_accepting_sockets; ++i) {
        ss.emplace_back(std::make_unique<as::ssl::stream<as::ip::tcp::socket>>(ioc, ctx));
    }

    std::function<void(as::ssl::stream<as::ip::tcp::socket>&)> do_accept;
    std::vector<socket_up> connections;

    do_accept =
        [&] (as::ssl::stream<as::ip::tcp::socket>& s){
            ac.async_accept(
                s.lowest_layer(),
                [&]
                (boost::system::error_code const& e) {
                    std::cout << e.message() << std::endl;
                    assert(!e);
                    std::cout << "server connected : socket = " << std::hex << &s << std::endl;
                    s.async_handshake(
                        as::ssl::stream_base::server,
                        [&]
                        (boost::system::error_code e) {
                            std::cout << e.message() << std::endl;
                            assert(!e);

                            auto it = std::find_if(ss.begin(), ss.end(), [&](auto const& e) { return e.get() == &s; });
                            assert(it != ss.end());

                            connections.emplace_back(std::move(*it));

                            // renew the socket then accept again
                            *it = std::make_unique<as::ssl::stream<as::ip::tcp::socket>>(ioc, ctx);
                            std::cout << "renewed socket = " << std::hex << &**it << std::endl;
                            do_accept(**it);
                        }
                    );
                }
            );
        };

    for (auto const& s : ss) do_accept(*s);

    // Client

    as::ip::tcp::endpoint ep(ip, port);

    as::ssl::context ctx1(as::ssl::context::tlsv12);
    ctx1.set_verify_mode(as::ssl::verify_none);
    as::ssl::stream<as::ip::tcp::socket> cs1(ioc, ctx1);
    cs1.lowest_layer().async_connect(
        ep,
        [&]
        (boost::system::error_code const& e) {
            assert(!e);
            std::cout << "client1 connected" << std::endl;
            cs1.async_handshake(
                as::ssl::stream_base::client,
                [&]
                (boost::system::error_code const& e) {
                    assert(!e);
                    std::cout << "client1 handshaked" << std::endl;
                }
            );
        }
    );

    as::ssl::context ctx2(as::ssl::context::tlsv12);
    ctx2.set_verify_mode(as::ssl::verify_none);
    as::ssl::stream<as::ip::tcp::socket> cs2(ioc, ctx2);
    cs2.lowest_layer().async_connect(
        ep,
        [&]
        (boost::system::error_code const& e) {
            assert(!e);
            std::cout << "client2 connected" << std::endl;
            cs2.async_handshake(
                as::ssl::stream_base::client,
                [&]
                (boost::system::error_code const& e) {
                    assert(!e);
                    std::cout << "client2 handshaked" << std::endl;
                }
            );
        }
    );

    // Start
    ioc.run();
}
redboltz commented 5 years ago

I'm not sure this approach really can improve connect processing speed. So I wrote small benchmark code.

#include <iostream>
#include <vector>
#include <algorithm>
#include <cassert>
#include <memory>
#include <thread>
#include <chrono>

#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>

namespace as = boost::asio;

int main(int argc, char** argv) {
    if (argc != 4) {
        std::cout << "Usage: " << argv[0] << " accept_sockets server_threads clients" << std::endl;
        return -1;
    }

    auto const asize = boost::lexical_cast<std::size_t>(argv[1]);
    auto const tsize = boost::lexical_cast<std::size_t>(argv[2]);
    auto const csize = boost::lexical_cast<std::size_t>(argv[3]);

    auto rest = csize;

    constexpr std::uint16_t port = 12345;
    auto ip = as::ip::address::from_string("127.0.0.1");

    using socket_up = std::unique_ptr<as::ip::tcp::socket>;

    // Server
    as::io_context iocs;
    as::ip::tcp::acceptor ac(iocs, as::ip::tcp::endpoint(as::ip::tcp::v4(), port));
    // multiple sockets for async_accept
    std::vector<socket_up> ss;
    ss.reserve(asize);
    for (std::size_t i = 0; i != asize; ++i) {
        ss.emplace_back(std::make_unique<as::ip::tcp::socket>(iocs));
    }

    std::function<void(as::ip::tcp::socket&)> do_accept;
    std::vector<socket_up> connections;

    as::io_context::strand str(iocs);

    do_accept =
        [&] (as::ip::tcp::socket& s){
            ac.async_accept(
                s,
                as::bind_executor(
                    str,
                    [&]
                    (boost::system::error_code const& e) {
                        assert(!e);

                        auto it = std::find_if(ss.begin(), ss.end(), [&](auto const& e) { return e.get() == &s; });
                        assert(it != ss.end());

                        connections.emplace_back(std::move(*it));

                        // renew the socket then accept again
                        *it = std::make_unique<as::ip::tcp::socket>(iocs);
                        do_accept(**it);
                        if (--rest == 0) iocs.stop();
                    }
                )
            );
        };

    for (auto const& s : ss) do_accept(*s);

    std::vector<std::thread> ths;
    ths.reserve(tsize);
    for (std::size_t i = 0; i != tsize; ++i) {
        ths.emplace_back(
            [&] {
                iocs.run();
            }
        );
    }

    // Client
    as::io_context iocc;
    as::ip::tcp::endpoint ep(ip, port);

    auto start = std::chrono::system_clock::now();

    for (std::size_t i = 0; i != csize; ++i) {
        auto cs = std::make_shared<as::ip::tcp::socket>(iocc);
        cs->async_connect(
            ep,
            [&, cs]
            (boost::system::error_code const& e) {
                assert(!e);
            }
        );
    }

    // Start
    iocc.run();
    for (auto& t : ths) t.join();

    auto end = std::chrono::system_clock::now();
    auto diff = end - start;
    std::cout << "elapsed time = "
              << std::chrono::duration_cast<std::chrono::milliseconds>(diff).count()
              << " msec."
              << std::endl;
}

Environment:

Linux tokyoarch 5.0.0-arch1-1-ARCH #1 SMP PREEMPT Mon Mar 4 14:11:43 UTC 2019 x86_64 GNU/Linux

Compiler:

clang version 7.0.1 (tags/RELEASE_701/final) Target: x86_64-pc-linux-gnu Thread model: posix InstalledDir: /usr/bin

Optimize option:

-O3

Result

For 15,000 clients:

acceptor thread ms(avarage)
1 1 2850
5 1 2850
1 5 2660
5 5 2670

It seems that the number of acceptor is not an effective factor to the performance. The number of the server threads is a little bit effective.

NOTE: When I removed strand from async_accept() callback, I got memory error.

jonesmz commented 5 years ago

Even if single threading, multiple sockets with single acceptor::async_accept() would improve the performance because the server can start the next accepting sequence during the previous accepting sequence is processing. Is that right?

Yes, I believe this is true. On some platforms, boost::asio offloads socket accepts to the operating system, so it's actually multiple accepting sequences happening in parallel, not one starting as soon as the previous ends. But that's depending on the platform being used.

I don't read ssl::context source yet but I think that the name context implies it is stateful object (*1). Let's say there are 3 steps in the context. I'm afraid that the following scenario. It happens on single threading.

It might be, I'm not sure.

ctx is shared by TLS socket s1 and s2. s1 accepts the tcp connection and then start TLS step1. s1 sets step1 to ctx. ios.run() dispatches internal TLS socket event. It sets step2 to ctx. s2 accepts the tcp connection and then start TLS step1. s2 sets step1 to ctx. At this point, ctx_ is overwritten unexpectedly. *1 It's just my guess. We need to confirm it.

It depends on how ctx is used. I think that if you assign each of these operations to the same strand then it might be safe.

Your example code worked? That's good.

jonesmz commented 5 years ago

The reason why I brought this issue to your attention wasn't about performance. It was about the potential for an incoming connection to be lost due to the socket not being available immediately.

redboltz commented 5 years ago

The reason why I brought this issue to your attention wasn't about performance.

Ok, I understand.

It was about the potential for an incoming connection to be lost due to the socket not being available immediately.

I don't think so. Even if async_accept() is not called, client connection is processed.

Consider the following code:

#include <iostream>
#include <thread>
#include <vector>
#include <functional>

#include <boost/asio.hpp>

namespace as = boost::asio;

int main() {
    // Common Config
    constexpr std::uint16_t port = 12345;
    auto ip = as::ip::address::from_string("127.0.0.1");
    boost::asio::io_service ios;

    // Server
    as::io_context ioc;
    as::ip::tcp::acceptor ac(ioc, as::ip::tcp::endpoint(as::ip::tcp::v4(), port));

    std::function<void()> do_accept;
    std::vector<std::shared_ptr<boost::asio::ip::tcp::socket>> connections;
    boost::asio::ip::tcp::socket ss(ioc);

    std::size_t accept_count = 0;
    do_accept = [&] {
        std::cout << "[server] start accept" << std::endl;
        ac.async_accept(
            ss,
            [&]
            (boost::system::error_code const& e) {
                assert(!e);
                std::cout << "[server] connection accepted " << accept_count++ << std::endl;
                connections.emplace_back(std::make_shared<boost::asio::ip::tcp::socket>(std::move(ss)));
            }
        );
    };

    // Client
    as::ip::tcp::endpoint ep(ip, port);

    auto cs = std::make_shared<as::ip::tcp::socket>(ioc);
    std::cout << "[client] " << "async_connect" << std::endl;

    // client connects **before** server start accept
    cs->async_connect(
        ep,
        [&, cs]
        (boost::system::error_code const& e) {
            assert(!e);
            std::cout << "[client] " << "connected" << std::endl;
        }
    );

    // server start accept here
    do_accept();

    // Start
    ioc.run();
}

Running demo: https://wandbox.org/permlink/xYkoC9MpzqPg6e5x

redboltz commented 5 years ago

See https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/reference/basic_socket_acceptor/basic_socket_acceptor/overload3.html

This constructor creates an acceptor and automatically opens it to listen for new connections on the specified endpoint.

jonesmz commented 5 years ago

Ok, if there's no need to have overlapping sockets, then there'd be no real benefit to doing this. Thank you for considering it :-)

redboltz commented 5 years ago

Thank you for understanding. During analyzing the issue, I've learned in depth of Boost.Asio and TCP.