yyzybb537 / libgo

Go-style concurrency in C++11
MIT License
3.17k stars 756 forks source link

[boost asio async] . vs [boost asio sync + libgo] #130

Open magicsupery opened 5 years ago

magicsupery commented 5 years ago

假如linux机器下,40个cpu,写一个dispatch转发的程序,请问下按照你的经验,哪个会更快些?

yyzybb537 commented 5 years ago

后者

magicsupery commented 5 years ago

ok,我准备写例子验证一下

magicsupery commented 5 years ago

届时会在这里讨论结果,3q

magicsupery commented 5 years ago

include "hub_connection.hpp"

include <glog/logging.h>

namespace ins {

define DEFAULT_WRITE_CHAN_BUFFER_NUM 100

#define HEAD_SIZE 4
HubConnection::HubConnection(boost::asio::io_service&  io_service):
    socket_(io_service),
    write_chan_(DEFAULT_WRITE_CHAN_BUFFER_NUM),
    closed_(false)
{
    LOG(INFO) << __FUNCTION__ << " "<< this;
}

HubConnection::~HubConnection()
{
    LOG(INFO) << __FUNCTION__ << " " << this;
    socket_.close();
}

void HubConnection::start()
{
    go std::bind(&HubConnection::read, shared_from_this());
    go std::bind(&HubConnection::write, shared_from_this());
}

void HubConnection::read()
{
    LOG(INFO) << __FUNCTION__ << " " << this << " start read";

    while(!closed_)
    {
        boost::system::error_code error;

        auto size = socket_.read_some(boost::asio::buffer(read_buffer_), error);
        if(!error)
        {
            LOG(INFO) << __FUNCTION__ << " " << this << " read input size " << size;

            write_chan(std::make_shared<std::string>(read_buffer_));

        }
        else
        {
            LOG(ERROR) << __FUNCTION__ << " " << this << " read error " << error.message();
            closed_ = true;
        }
    }

    LOG(INFO) << __FUNCTION__  << " " << this << " end read";

}

void HubConnection::write()
{
    LOG(INFO) << __FUNCTION__  << " " << this << " start write ";
    while(!closed_)
    {
        data_buf_ptr buf_ptr;
        bool is_success = write_chan_.TimedPop(buf_ptr, std::chrono::seconds(1));
        if(is_success)
        {
            socket_.write_some(boost::asio::buffer(buf_ptr->c_str(), buf_ptr->size()));
        }
    }

    LOG(INFO) << __FUNCTION__ << " " << this << " end write ";
}

}

这里依赖asio实现了一个connection。这里在write那里有个trick,必须调用channel的TimePop,否则会阻塞在这里无法destruct; 之前考虑过另一种写法是,这里可以阻塞,但是在read结束或者出错的时候,可以将两个go task 取消,但是找API没有找到对应的方法,想问下用1的方式是否会有问题?

runner365 commented 5 years ago

writechan.Close() 在推出的时候,外部调用这个,可以吗?

echoface commented 5 years ago

其实在c++世界里, 跨线程调度的协程是非常危险的, 在实际业务逻辑代码层, 反而不如有规划的合理安排资源, 对协程底层如何调度缺乏理解和本身编码能力不是非常丰富的开发人员来讲, 最后对项目本省更加混乱.

目前为了充分的利用CPU与多核多线程的优势同时又为了获得coroutine这种可以用户态控制的切入切出的能力, 只能在自己公司又经验的人自己开发一套. 自己系统内部使用. 目前看到能工作且工作很好的基本都是这样一类. bthread/ libgo 或者其他的. 单从使用的安全性上讲, 各种这些M:N(带或者不带任务跨线程调度机制)的各种库,远远达不到“安心”用的状态. 里面都有各种个样的需要避免的东西, 而这些使用者是不知道的. 很多甚至开发者自己也罗列不出来, 只有别人出了问题. 提到了之后, 开发者才会说:“哦, 这里因为xxx的原因,不能这样调度"

yyzybb537 commented 5 years ago

可以使用writechan.Close()

yyzybb537 commented 5 years ago

其实在c++世界里, 跨线程调度的协程是非常危险的, 在实际业务逻辑代码层, 反而不如有规划的合理安排资源, 对协程底层如何调度缺乏理解和本身编码能力不是非常丰富的开发人员来讲, 最后对项目本省更加混乱.

目前为了充分的利用CPU与多核多线程的优势同时又为了获得coroutine这种可以用户态控制的切入切出的能力, 只能在自己公司又经验的人自己开发一套. 自己系统内部使用. 目前看到能工作且工作很好的基本都是这样一类. bthread/ libgo 或者其他的. 单从使用的安全性上讲, 各种这些M:N(带或者不带任务跨线程调度机制)的各种库,远远达不到“安心”用的状态. 里面都有各种个样的需要避免的东西, 而这些使用者是不知道的. 很多甚至开发者自己也罗列不出来, 只有别人出了问题. 提到了之后, 开发者才会说:“哦, 这里因为xxx的原因,不能这样调度"

首先请明确“安心用”和“无脑用”的区别,即使是系统syscall这样级别的东西,也是不能无脑用的,你也要了解很多API细节(比如signal处理函数要可重入),不可能在对底层一无所知的情况下hold住大项目。 但是系统syscall是可以"安心"用的,这个安心的前提是经过充分测试,大多数场景不会出现问题。 在我看来,有充分测试的或商用项目检验过的开源软件都是可以“安心”用的,因为大多数场景下已经不会有bug了,即使遇到偶发的bug使用者都可以自主fix。 你认为的跨线程调度协程的危险性其实是来源于多线程编程的复杂性,近乎9成的程序员都是hold不住这种复杂性的,但是有些场景下又不得不使用多线程来提升性能。 libgo把多线程编程的复杂性都封装在了CSP模型的背后,对程序员屏蔽了这些细节,是用来降低开发门槛的。协程本身带来的技术细节还是比较少的,是远远低于多线程编程和异步编程的复杂性的,总体而言还是利大于弊的!

magicsupery commented 5 years ago

我写了两个最基础的程序进行测试,一个是用了io_service的异步通信版本,一个是用了io_service与libgo的同步版本,代码如下: async_asio的异步版本:

include <boost/asio.hpp>

include

include

include

include

define client_num 15000

define send_num 2000

static const uint16_t port = 43333; using namespace boost::asio; using namespace boost::asio::ip; using boost::system::error_code; using namespace std::chrono;

tcp::endpoint addr(address::from_string("127.0.0.1"), port); std::vector<std::shared_ptr> io_services; std::vector<std::shared_ptr> works;

char buf[1024]; auto start_time = std::chrono::system_clock::now(); using socket_ptr = std::shared_ptr;

std::atomic_int conn_num(0); io_service& get_random_ios() { return *(io_services[rand() % io_services.size()].get()); }

void read_handler(socket_ptr s, const boost::system::error_code& ec, size_t size) { if(!ec) { buf[size] = '\0'; s->async_read_some(buffer(buf), std::bind(read_handler, s, std::placeholders::_1,std::placeholders::_2)); auto now = std::chrono::system_clock::now(); std::cout << " interval is " << std::chrono::duration_cast(now - start_time).count() << " ms conn"<< conn_num.load() <<") read(" << buf <<")" << std::endl; } else { std::cout << "wrong read_handler " << ec.message() << std::endl; } }

void accept_handler(socket_ptr s, tcp::acceptor& acc, const boost::system::error_code& ec) { if(!ec) { conn_num.fetch_add(1); s->async_read_some(buffer(buf), std::bind(read_handler, s, std::placeholders::_1,std::placeholders::_2));

    std::cout<<"create connection " << conn_num.load() << std::endl;    

    socket_ptr s_ptr(new tcp::socket(get_random_ios()));
    acc.async_accept(*(s_ptr.get()), 
            std::bind(accept_handler, s_ptr, std::ref(acc), std::placeholders::_1));
}
else
{
    std::cout << " wrong accept_handler " << ec.message() << std::endl;
}

} void echo_server(tcp::acceptor& acc) { socket_ptr s_ptr(new tcp::socket(get_random_ios())); acc.async_accept(*(s_ptr.get()), std::bind(accept_handler, s_ptr, std::ref(acc), std::placeholders::_1)); }

void write_handler(socket_ptr s, int i, const boost::system::error_code& ec, size_t size) { if(!ec) { if(i < send_num) { s->async_write_some(buffer("1234"), std::bind(write_handler, s, ++i, std::placeholders::_1, std::placeholders::_2)); } } else { std::cout << " wrong write_handler " << ec.message() << std::endl; } }

void connect_handler(socket_ptr s, const boost::system::error_code& ec) { if(!ec) { s->async_write_some(buffer("1234"), std::bind(write_handler, s, 0, std::placeholders::_1, std::placeholders::_2)); } else { std::cout << " wrong connect_handler" << ec.message() << std::endl; } }

void client() { for(int i=0;i < client_num; i++) { socket_ptr s_ptr(new tcp::socket(get_random_ios())); s_ptr->async_connect(addr, std::bind(connect_handler, s_ptr, std::placeholders::_1)); } }

int main() {

for(unsigned int i = 0; i < std::thread::hardware_concurrency(); i++)
{
    io_services.push_back(std::make_shared<io_service>());
}

std::vector<std::shared_ptr<std::thread>> threads;

for(auto ios_ptr: io_services)
{

    std::shared_ptr<io_service::work>  work_ptr(new io_service::work(*(ios_ptr.get())));
    works.push_back(work_ptr);
    std::shared_ptr<std::thread>  thread_ptr(new std::thread(
            [ios_ptr]
            {
                ios_ptr->run();
            }));

    threads.push_back(thread_ptr);

}

start_time = std::chrono::high_resolution_clock::now();
tcp::acceptor acc(get_random_ios());
acc.open(addr.protocol());
acc.bind(addr);
acc.listen(client_num);

echo_server(acc);
client();
for(auto thread_ptr: threads)
{
    thread_ptr->join();
}

return 0;

}

以下是libgo版本

include <boost/asio.hpp>

include

include

include

include <libgo/coroutine.h>

//#include "../src/client/client.hpp" //#include "../src/service/client_service.hpp"

define client_num 15000

define send_num 2000

define _ip "127.0.0.1"

static const uint16_t port = 9876; using namespace boost::asio; using namespace boost::asio::ip; using boost::system::error_code; using namespace std::chrono;

tcp::endpoint addr(address::from_string("127.0.0.1"), port); std::vector<std::shared_ptr> io_services;

auto start_time = std::chrono::system_clock::now(); co::atomic_t conn_num(0); co::atomic_t conn_success_num(0);

io_service& get_random_ios() { return *(io_services[rand() % io_services.size()].get()); }

void echo_server() { tcp::acceptor acc(get_random_ios()); acc.open(addr.protocol()); acc.bind(addr); acc.listen(client_num);

for (;;) {
    std::shared_ptr<tcp::socket> s(new tcp::socket(get_random_ios()));
    acc.accept(*s);
    conn_num.fetch_add(1);
    go [s]{
        char buf[1024];
        for(;;)
        {
            error_code e;
            s->read_some(buffer(buf), e);
            if(!e)
            {
                auto now = std::chrono::system_clock::now();
                std::cout << " interval is " <<
                    std::chrono::duration_cast<milliseconds>(now - start_time).count() <<
                    " ms conn("<<conn_num<<")"<< " read("<<buf<<")"<< std::endl;
            }
            else
            {
                return;
            }
        }
    };
}

}

void client() { for(int i=0;i < client_num; i++) { go [] { error_code ec; tcp::socket s(get_random_ios()); s.connect(addr, ec); conn_success_num.fetch_add(1); std::cout << "connect success(" << conn_success_num.load() << ")" << std::endl; if(ec) { std::cout << "connect wrong " << ec.message() << std::endl; return; }

        std::string msg = "1234";

        for(int i = 0; i< send_num; i++)
        {
            s.write_some(buffer(msg), ec);
            if(ec)
            {
                std::cout<<"write wrong " << ec.message() << std::endl;
                return;
            }
        }
    };
}

}

/* void hub_client() {

for(int i=0;i < client_num; i++)
{
    go[]
    {
        auto client_ptr = std::make_shared<ins::Client>
            (get_random_ios(), _ip, port);
        client_ptr->connect();

        auto  msg = std::make_shared<std::string>("1234");
        for(int i = 0; i< send_num; i++)
        {
            client_ptr->test_write(msg);
        }
    };
}

} */

int main() {

for(unsigned int i = 0; i < std::thread::hardware_concurrency(); i++)
{
    io_services.push_back(std::make_shared<io_service>());
}
start_time = std::chrono::high_resolution_clock::now();
go echo_server;
go client;

co_sched.Start(std::thread::hardware_concurrency());
return 0;

}

================== 在同一台电脑测试下,在2w个连接,发送300条消息时候 1.效率上: 异步版本是比同步版本快的,并且CPU利用率上看非常平均;libgo版本经常出现某几个cpu高,其他cpu低。 2.CPU使用率 异步版本的CPU使用率远远高于libgo版本

测试的时候需要注意调高全连接队列长度。

我的猜测: libgo的cpu使用率较低是因为协程的switch开销小于线程?但是cpu没有均衡开并不知道原因,能否给出几个合理猜测呢?

==== 最后抱歉下,代码格式无法插入大段代码,我尝试去调整下格式

yyzybb537 commented 5 years ago

先跑单线程的试试看。

libgo的Start设置的是协程调度器的线程数量,网络线程数量在其他地方设置

magicsupery commented 5 years ago

先跑单线程的试试看。

libgo的Start设置的是协程调度器的线程数量,网络线程数量在其他地方设置

我去找找网络线程数量的函数

magicsupery commented 5 years ago

先跑单线程的试试看。

libgo的Start设置的是协程调度器的线程数量,网络线程数量在其他地方设置

你好,请问下在哪里调整网络io呢,没找到

yyzybb537 commented 5 years ago

更新后参加sample5