mfavant / tubekit

NEW PROJECT https://github.com/crust-hub/avant
MIT License
0 stars 0 forks source link

feat: better todo #30

Closed gaowanlu closed 3 months ago

gaowanlu commented 3 months ago

better buffer

  1. don't record time.
  2. using ring buffer, not move_to_before.

better worker_pool::assign

  1. When there is a gid task in the worker's task_queue, it should not be pushed to the worker's task_queue.
gaowanlu commented 3 months ago
// 超时断开可以单独设计一个组件,定时tick进行检查,当连接有效处理时就会向组件更新下最新时间
// tick时获取超过一定上限的连接标识,对这些连接进行mark_close,进而会分配到worker进行处理
// 这样一来无论是什么http ws stream,都可以用,需要设计处理 请求协议发一半 如HTTP请求报文发一半的超时情况一定时间没有解析完请求报文
// 则应该超时关闭,其他协议类似
gaowanlu commented 3 months ago

Firstly,worker_pool::assign

gaowanlu commented 3 months ago

OK=>don't record time.(https://github.com/crust-hub/tubekit/commit/9ac8a1a455cbc1c45119b31ad0d766535f03b018) OK=>better worker_pool::assign(https://github.com/crust-hub/tubekit/commit/9ac8a1a455cbc1c45119b31ad0d766535f03b018)

TODO:

  1. using ring buffer, not move_to_before.
  2. // 超时断开可以单独设计一个组件,定时tick进行检查,当连接有效处理时就会向组件更新下最新时间 // tick时获取超过一定上限的连接标识,对这些连接进行mark_close,进而会分配到worker进行处理 // 这样一来无论是什么http ws stream,都可以用,需要设计处理 请求协议发一半 如HTTP请求报文发一半的超时情况一定时间没有解析完请求报文 // 则应该超时关闭,其他协议类似
gaowanlu commented 3 months ago

the best 0f988a84b272d4460401bc95eb47b61

除了mainthread on_tick(挂载一些非常不耗时的任务) 还可以设计 workerthread on_tick 提供worker已经管理的conn作为参数

gaowanlu commented 3 months ago

accept多个 可以搞个随机策略

gaowanlu commented 3 months ago

bool websocket_connection::sock2buf() { static char inner_buffer[1024] = {0}; while (true) { int oper_errno = 0; int len = socket_ptr->recv(inner_buffer, 1024, oper_errno); if (len == -1 && oper_errno == EAGAIN) {

inner buffer有问题 stream conn同理

gaowanlu commented 3 months ago

而且inner buffer没有被全部写入recv_buffer,也会导致数据丢失问题,当recv_buffer满了后不应再从socket进行recv,有数据要读则一直监听读,不断处理,如果recv满了,都没被取出属实应该关闭此连接了,违反最大限制了

gaowanlu commented 3 months ago
// LOG_ERROR("do_task %llu", socket_ptr->get_gid());

// Submit the task to the queue of task_dispatcher
singleton<worker_pool>::instance()->assign(new_task, socket_ptr->get_gid()); 有问题 线程不安全,socket的 gid 可能已经变了
gaowanlu commented 3 months ago

先有socket后有conn 创建socket和conn时将socket2mapiing加入将conn2mapping加入 gid<->(sock,conn) sock<->gid conn<->gid 先删conn后删socket 删除conn时将conn2mapping删除 同时删除socket2mapping gid<->(sock,conn) sock<->gid conn<->gid 做一个gid socketptr connptr 之间的关系约束 原子性加入或删除 这么以来 do_task时输入参数 直接用 gid 先判断下gid有没有对应(sock,conn)有才派task m_data直接用gid 在worker接收到任务时 从task中获取gid 然后从mapping中获取到 sock,gid,conn 如果没有则说明连接已经关闭了 非本线程对conn操作时 为了保证安全直接提供锁mapping的操作 mapping对外提供 拷贝现有所有gid的操作,有了mapping之后conn_mgr就没什么用了,其实就是把conn_mgr改为了mapping conn内的buffer仍旧有锁 为了保持高效 为每一个worker都搞个mapping 只有通过gid然后查询(sock,conn)和挂载lamda的操作 通过gid即可映射确定是那个worker进而直接找到其mapping 直接用数组存就好了 这样就避免了锁全局 只锁了一个worker的mapping

gaowanlu commented 3 months ago
#include <iostream>
#include <mutex>
#include <unordered_map>
#include <functional>
#include <vector>
using namespace std;

class gid2mapping;

class safe_mapping
{
private:
    friend class gid2mapping;

    safe_mapping()
    {
    }

    ~safe_mapping()
    {
    }

    void if_exist(uint64_t gid,
                  std::function<void(uint64_t, std::pair<void *, void *>)> succ_callback,
                  std::function<void(uint64_t)> failed_callback)
    {
        lock.lock();
        auto iter = gid2pair.find(gid);
        if (iter != gid2pair.end())
        {
            if (succ_callback)
            {
                succ_callback(iter->first, iter->second);
            }
        }
        else
        {
            if (failed_callback)
            {
                failed_callback(gid);
            }
        }
        lock.unlock();
    }

    void remove(uint64_t gid,
                std::function<void(uint64_t, std::pair<void *, void *>)> succ_callback,
                std::function<void(uint64_t)> failed_callback)
    {
        lock.lock();
        auto iter = gid2pair.find(gid);
        if (iter != gid2pair.end())
        {
            auto first = iter->first;
            auto second = iter->second;
            gid2pair.erase(iter);
            if (succ_callback)
            {
                succ_callback(first, second);
            }
        }
        else
        {
            if (failed_callback)
            {
                failed_callback(gid);
            }
        }
        lock.unlock();
    }

    void insert(uint64_t gid,
                std::pair<void *, void *> value,
                std::function<void(uint64_t, std::pair<void *, void *>)> succ_callback,
                std::function<void(uint64_t, std::pair<void *, void *>)> failed_callback)
    {
        lock.lock();
        auto iter = gid2pair.find(gid);
        if (iter != gid2pair.end())
        {
            if (failed_callback)
            {
                failed_callback(gid, value);
            }
        }
        else
        {
            gid2pair[gid] = value;
            if (succ_callback)
            {
                succ_callback(gid, value);
            }
        }
        lock.unlock();
    }

    std::unordered_map<uint64_t, std::pair<void *, void *>> gid2pair{};
    std::mutex lock;
};

class gid2mapping
{
public:
    gid2mapping(uint32_t thread_size) : m_thread_size(thread_size)
    {
        m_safe_mapping = new safe_mapping[thread_size];
    }

    ~gid2mapping()
    {
        if (m_safe_mapping)
        {
            delete[] m_safe_mapping;
        }
    }

    void if_exist(uint64_t gid,
                  std::function<void(uint64_t, std::pair<void *, void *>)> succ_callback,
                  std::function<void(uint64_t)> failed_callback)
    {
        uint64_t hash_idx = gid % m_thread_size;
        m_safe_mapping[hash_idx].if_exist(gid, succ_callback, failed_callback);
    }

    void remove(uint64_t gid,
                std::function<void(uint64_t, std::pair<void *, void *>)> succ_callback,
                std::function<void(uint64_t)> failed_callback)
    {
        uint64_t hash_idx = gid % m_thread_size;
        m_safe_mapping[hash_idx].remove(gid, succ_callback, failed_callback);
    }

    void insert(uint64_t gid,
                std::pair<void *, void *> value,
                std::function<void(uint64_t, std::pair<void *, void *>)> succ_callback,
                std::function<void(uint64_t, std::pair<void *, void *>)> failed_callback)
    {
        uint64_t hash_idx = gid % m_thread_size;
        m_safe_mapping[hash_idx].insert(gid, value, succ_callback, failed_callback);
    }

    safe_mapping *m_safe_mapping{nullptr};
    const uint32_t m_thread_size{0};
};

int main()
{
    gid2mapping mmaping(6);
    uint32_t succ_cnt = 0, failed_cnt = 0;
    for (int i = 0; i < 10; i++)
    {
        mmaping.insert(
            i,
            {nullptr, nullptr},
            [&succ_cnt](uint64_t key, std::pair<void *, void *> value)
            {
                succ_cnt++;
                cout << "insert " << key << " succ" << endl;
            },
            [&failed_cnt](uint64_t key, std::pair<void *, void *> value)
            {
                failed_cnt++;
                cout << "insert " << key << " failed" << endl;
            });
        cout << succ_cnt << " " << failed_cnt << endl;
    }
    for (int i = 0; i < 10; i++)
    {
        mmaping.insert(
            i,
            {nullptr, nullptr},
            [](uint64_t key, std::pair<void *, void *> value)
            {
                cout << "insert " << key << " succ" << endl;
            },
            [](uint64_t key, std::pair<void *, void *> value)
            {
                cout << "insert " << key << " failed" << endl;
            });
    }
    for (int i = 0; i < 10; i++)
    {
        mmaping.if_exist(
            i,
            [](uint64_t key, std::pair<void *, void *> value)
            {
                cout << "if_exist " << key << " succ" << endl;
            },
            [](uint64_t key)
            {
                cout << "if_exist " << key << " failed" << endl;
            });
    }
    for (int i = 0; i < 10; i++)
    {
        mmaping.remove(
            i,
            [](uint64_t key, std::pair<void *, void *> value)
            {
                cout << "remove " << key << " succ" << endl;
            },
            [](uint64_t key)
            {
                cout << "remove " << key << " failed" << endl;
            });
    }
    for (int i = 0; i < 10; i++)
    {
        mmaping.if_exist(
            i,
            [](uint64_t key, std::pair<void *, void *> value)
            {
                cout << "if_exist " << key << " succ" << endl;
            },
            [](uint64_t key)
            {
                cout << "if_exist " << key << " failed" << endl;
            });
    }
    // 如if_exit可以do_task内 与 task::run开始 使用
    // 如insert可以在创建连接时使用
    // 如remove可以在task::run内关闭连接时使用
    return 0;
}
// insert 0 succ
// 1 0
// insert 1 succ
// 2 0
// insert 2 succ
// 3 0
// insert 3 succ
// 4 0
// insert 4 succ
// 5 0
// insert 5 succ
// 6 0
// insert 6 succ
// 7 0
// insert 7 succ
// 8 0
// insert 8 succ
// 9 0
// insert 9 succ
// 10 0
// insert 0 failed
// insert 1 failed
// insert 2 failed
// insert 3 failed
// insert 4 failed
// insert 5 failed
// insert 6 failed
// insert 7 failed
// insert 8 failed
// insert 9 failed
// if_exist 0 succ
// if_exist 1 succ
// if_exist 2 succ
// if_exist 3 succ
// if_exist 4 succ
// if_exist 5 succ
// if_exist 6 succ
// if_exist 7 succ
// if_exist 8 succ
// if_exist 9 succ
// remove 0 succ
// remove 1 succ
// remove 2 succ
// remove 3 succ
// remove 4 succ
// remove 5 succ
// remove 6 succ
// remove 7 succ
// remove 8 succ
// remove 9 succ
// if_exist 0 failed
// if_exist 1 failed
// if_exist 2 failed
// if_exist 3 failed
// if_exist 4 failed
// if_exist 5 failed
// if_exist 6 failed
// if_exist 7 failed
// if_exist 8 failed
// if_exist 9 failed
gaowanlu commented 3 months ago

https://github.com/crust-hub/tubekit/commit/4e7ea0a90f683090508948e51b5d9e8eaba829d2 https://github.com/crust-hub/tubekit/commit/4e7ea0a90f683090508948e51b5d9e8eaba829d2

gaowanlu commented 3 months ago

HTTP: 超文本传输协议(HTTP)有一个最大标头大小限制为16KB。这意味着HTTP请求或响应的标头不能大于16KB。然而,没有对HTTP体的大小限制。

gaowanlu commented 3 months ago

// 缓冲区大小 const int BUFFER_SIZE = 4096; // HTTP 报文缓冲区 char buffer[BUFFER_SIZE]; // HTTP 报文缓冲区已使用长度 size_t buffer_used_len = 0;

// HTTP 报文解析器 http_parser parser; // HTTP 报文解析器设置 http_parser_settings settings;

// 读取客户端数据 size_t n = recv(client_socket, buffer + buffer_used_len, BUFFER_SIZE - buffer_used_len, 0); if (n > 0) { // 更新缓冲区已使用长度 buffer_used_len += n; // 处理缓冲区中的数据 while (buffer_used_len > 0) { // 执行 HTTP 报文解析 http_parser_execute(parser, settings, buffer, buffer_used_len); // 更新缓冲区已使用长度 buffer_used_len -= parser.size; // 移动缓冲区数据 memmove(buffer, buffer + parser.size, buffer_used_len); } }

gaowanlu commented 3 months ago

process_connection,根本无需copyall因为,在此函数中 recvbuffer 是线程安全的,没有谁此时会操作recvbuffer