mfavant / tubekit

NEW PROJECT https://github.com/crust-hub/avant
MIT License
0 stars 0 forks source link

bug: donot socket_handler->remove(socket_ptr) in task #28

Closed gaowanlu closed 3 months ago

gaowanlu commented 3 months ago

to remove socket_handler->remove(socket_ptr) in stream_task http_task websocket_task

gaowanlu commented 3 months ago

using wait_remove_queue ,read write cache.

gaowanlu commented 3 months ago
    // connection is close
    if (nullptr == t_stream_connection || t_stream_connection->is_close())
    {
        if (t_stream_connection)
        {
            singleton<connection_mgr>::instance()->remove(socket_ptr);
            singleton<socket_handler>::instance()->remove(socket_ptr);
        }
        return;
    } 

在connection上加个字段,标识是否在connection_mgr中,当时多次调用connection_mgr remove,而sockethandler remove必须设计方案让主线程去调用

gaowanlu commented 3 months ago

关于如何优化,在任何地方都能安全的向某个connection 发送内容 ,关键点其实在于保证connection已经从connection_mgr remove了,就不应该再将内容写到connection了,最佳方案目前能想出的只是,将数据写到另一个地方和connection分开存放,有发数据就将内容写到待发送,然后触发一个task今而connection被处理,此时connection从另一个地方拉取数据到自己中,需要保证connection的身份,可以为每一个新连接connection 搞一个uuid,发数据时直接指定 uuid 和 待发送的内容即可,uuid 与 待发送 绑定,当connection在connection mgr中时可以向uuid发数据,否则不能发数据,connection 关闭时 同时也应该应该删除uuid,待发数据,防止再次被写入内容,导致僵尸数据。总之就是让connection在自己的task中拉自己的数据,connection的操作只能在自己的task线程中进行,socket的相关操作应该放到sockethandler 的loop中去进行。数据进行隔离,向uuid发数据要保证线程安全,向uuid写内容时就要保证uuid不会被移走,这样来 任意一个地方向某个connection发消息 不用锁全局的connection_mgr导致时刻只有一个connection能被写

gaowanlu commented 3 months ago

上面仍存在问题 uuid的管理 也要全局锁,颗粒度太大了 每个worker有一个自己的MailBox 根据connection可以hash到相应的worker 将内容发到worker的MailBox中去然后在 worker线程有一个connection_task queue,同样有一个mail queue 二者可以共用一个queue,在基类task中区分下类型是 处理connection的task还是mailbox task 是mailbox task 在执行task时则将content写到对应的connection中去 此时操作connection是线程安全的,因为connection的任何操作都会在其hash到的worker线程中进行 然而socket的remove关于与回收必须移动至主线程即epoll_wait线程中去 因为epoll_wait监听fd时 另外一个线程将fd close掉这是非常2B的行为 建立新连接时 先 创建socket object 然后创建 connection object 关闭连接时 先 remove connection object, 然后才会 remove socket object 还有一点 在queue中connection task是有多个因为有很多不同的connection要处理 而mailbox task其实需要一个就行 在处理mailbox task处理时 会从自身list中读出所有mail 然后处理 如果connection 的buffer满了没法把mail写到connection则跳过 处理那个mail 记录ERR日志或者关闭connection mail本身的存储可以直接用string,但是string最大存多大数据 最大可以存多少个string 也是个问题

gaowanlu commented 3 months ago
// g++ main.cpp -o main.exe --std=c++11 -lpthread
#include <iostream>
#include <list>
#include <thread>
#include <mutex>
using namespace std;

class Mail
{
public:
    void *pSocket;
    std::string str;
};

std::list<Mail> mMailList1;
std::list<Mail> mMailList2;

int main()
{
    // init
    std::list<Mail> *pReadMailList = &mMailList1;
    std::list<Mail> *pWriteMailList = &mMailList2;
    bool productStop = false;
    bool existMail = false;
    std::mutex swapMutex;

    uint64_t mailCnt = 0;
    // product thread
    auto product = std::thread([&]()
                               {
                                do{
                                   if (productStop)
                                   {
                                       return;
                                   }

                                   // write
                                   // lock
                                   swapMutex.lock();
                                   for (int i = 0; i < 10; i++)
                                   {
                                        mailCnt++;
                                        Mail Mail;
                                        Mail.str = std::move(std::string(1024000, 'c'));
                                        pWriteMailList->push_back(std::move(Mail));
                                   }
                                   existMail = true;
                                   swapMutex.unlock();
                                   // lock free

                                   // sleep
                                   std::this_thread::sleep_for(std::chrono::milliseconds(1)); 
                                }while(true); });
    // lock free
    product.detach();

    uint loopCnt = 0;
    auto now = std::chrono::system_clock::now();
    auto startTime = std::chrono::system_clock::to_time_t(now);
    std::cout << "StartTime " << startTime << std::endl;

    while (true)
    {
        if (!existMail)
        {
            // sleep
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            continue;
        }
        loopCnt++;
        if (loopCnt > 10000)
        {
            productStop = true;
            break;
        }

        // lock
        swapMutex.lock();
        pReadMailList = (pReadMailList == &mMailList1) ? &mMailList2 : &mMailList1;
        pWriteMailList = (pWriteMailList == &mMailList1) ? &mMailList2 : &mMailList1;
        existMail = false;
        swapMutex.unlock();
        // lock free

        // process
        if (!pReadMailList->empty())
        {
            // std::cout << "MailList size " << pReadMailList->size() << std::endl;
            for (auto iter = pReadMailList->begin(); iter != pReadMailList->end(); ++iter)
            {
                // std::cout << "MailStrDataSize " << iter->str.size() << std::endl;
            }
        }
        pReadMailList->clear();

        // sleep
        std::this_thread::sleep_for(std::chrono::milliseconds());
    }

    now = std::chrono::system_clock::now();
    auto endTime = std::chrono::system_clock::to_time_t(now);
    std::cout << "EndTime " << endTime << std::endl;
    std::cout << "Interval " << endTime - startTime << std::endl;
    std::cout << "MailCnt " << mailCnt << std::endl;

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    return 0;
}
gaowanlu commented 3 months ago

暂时已 优化 safe socket and connection