chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.96k stars 1.22k forks source link

Reasoning behind num_mutexes and the aftermath #147

Open devgs opened 8 years ago

devgs commented 8 years ago

Can you please explain why the number of mutexes, used by strands must be limited?

This strategy leads to terrible consequences, when your application has two features:

Why not just use mutex pooling/recycling?

Here is a test case that shows the issue:

#include <asio/io_service.hpp>
#include <asio/strand.hpp>
#include <atomic>
#include <functional>
#include <iostream>
#include <thread>

std::atomic<bool> running{true};
std::atomic<int> counter{0};

struct Work
{
    Work(asio::io_service & io_service)
        : _strand(io_service)
    { }

    static void start_the_work(asio::io_service & io_service)
    {
        std::shared_ptr<Work> _this(new Work(io_service));

        _this->_strand.get_io_service().post(_this->_strand.wrap(std::bind(do_the_work, _this)));
    }

    static void do_the_work(std::shared_ptr<Work> _this)
    {
        counter.fetch_add(1, std::memory_order_relaxed);

        if (running.load(std::memory_order_relaxed)) {
            start_the_work(_this->_strand.get_io_service());
        }
    }

    asio::io_service::strand _strand;
};

struct BlockingWork
{
    BlockingWork(asio::io_service & io_service)
        : _strand(io_service)
    { }

    static void start_the_work(asio::io_service & io_service)
    {
        std::shared_ptr<BlockingWork> _this(new BlockingWork(io_service));

         _this->_strand.get_io_service().post(_this->_strand.wrap(std::bind(do_the_work, _this)));
    }

    static void do_the_work(std::shared_ptr<BlockingWork> _this)
    {
        sleep(5);
    }

    asio::io_service::strand _strand;
};

int main(int argc, char ** argv)
{
    asio::io_service io_service;
    std::unique_ptr<asio::io_service::work> work{new asio::io_service::work(io_service)};

    for (std::size_t i = 0; i < 1000; ++i) {
        Work::start_the_work(io_service);
    }

    std::vector<std::thread> workers;

    for (std::size_t i = 0; i < 8; ++i) {
        workers.push_back(std::thread([&io_service] {
            io_service.run();
        }));
    }

    if (argc > 1) {
        std::cout << "Spawning a blocking work" << std::endl;
        workers.push_back(std::thread([&io_service] {
            io_service.run();
        }));
        BlockingWork::start_the_work(io_service);
    }

    sleep(5);
    running = false;
    work.reset();

    for (auto && worker : workers) {
        worker.join();
    }

    std::cout << "Work performed:" << counter.load() << std::endl;
    return 0;
}

Test run in a usual way:


time ./asio_strand_test_case 
Work performed:3183957

real    0m5.008s
user    0m15.224s
sys     0m3.332s

Test run with a long blocking work:

time ./asio_strand_test_case 1
Spawning a blocking work
Work performed:195189

real    0m5.024s
user    0m0.920s
sys     0m0.164s
mabrarov commented 8 years ago

Per my understanding this was done for reducing (limiting) the number of mutexes (OS resources like non-paged pool) used by strands. I'm still looking for a lock-free strand. It could be LIFO or FIFO - I personally don't care about (don't need predictable) order of execution of handlers (which go through strand) and need just non-concurrent execution which won't consume too much of OS resources.