Open gaowanlu opened 2 months ago
#include <iostream>
#include <vector>
#include <string>
#include <thread>
#include <chrono>
#include <unordered_set>
#include <unordered_map>
using namespace std;
class Buffer
{
public:
Buffer() : data1(10),
data2(10),
reader_used_len(10),
writer_used_len(10),
reader_idx(0),
writer_idx(0),
data_reader(&data1),
data_writer(&data2)
{
}
void CacheSwap()
{
// 判断是否读完了
if (reader_idx != reader_used_len)
{
return;
}
// 没有其他数据
if (writer_used_len == 0)
{
return;
}
reader_idx = 0;
reader_used_len = writer_used_len;
writer_used_len = 0;
writer_idx = 0;
// swap cache
std::vector<std::string> *data_ptr = data_reader;
data_reader = data_writer;
data_writer = data_ptr;
read_out_idx = 0;
}
std::string &Push()
{
if (writer_idx == writer_used_len)
{
if (writer_used_len == data_writer->size()) // full
{
data_writer->push_back("");
}
writer_used_len++;
}
std::string &res = data_writer->at(writer_idx);
writer_idx++;
res.clear();
return res;
}
const std::string &First()
{
if (reader_idx == reader_used_len) // empty
{
if (data_reader->empty())
{
data_reader->push_back("");
return data_reader->at(0);
}
else
{
data_reader->at(0).clear();
}
return data_reader->at(0);
}
return data_reader->at(reader_idx);
}
void Pop()
{
if (reader_idx <= reader_used_len)
{
data_reader->at(reader_idx).clear();
data_reader->at(reader_idx).resize(0);
reader_idx++;
read_out_idx++;
}
}
bool Empty()
{
return (reader_idx >= reader_used_len);
}
public:
std::vector<std::string> data1;
std::vector<std::string> data2;
size_t reader_used_len;
size_t writer_used_len;
size_t reader_idx;
size_t writer_idx;
std::vector<std::string> *data_reader;
std::vector<std::string> *data_writer;
size_t read_out_idx;
};
class Conn
{
public:
Buffer recvBuffer;
Buffer sendBuffer;
// sock
// recvFirstByteIdx
// sendFirstByteIdx
};
int main(int argc, const char **argv)
{
std::vector<Conn> connArray(100);
std::unordered_set<int> connUsing;
std::unordered_set<int> connNotUse;
for (std::size_t index = 0; index < connArray.size(); index++)
{
connNotUse.insert(index);
}
std::unordered_map<int, int> sockfd2Conn;
std::unordered_map<int, int> gid2sockfd;
// constexpr int bufferCount = 20000;
// Buffer bufferArray[bufferCount];
// int bufferArrayIdx = 0;
// size_t allBytes = 0;
// do
// {
// if (bufferArrayIdx + 1 >= bufferCount)
// {
// bufferArrayIdx = 0;
// }
// else
// {
// bufferArrayIdx++;
// }
// Buffer &buffer = bufferArray[bufferArrayIdx];
// for (int loop = 0; loop < 100; loop++)
// {
// // 写内容
// buffer.CacheSwap();
// for (int i = 0; i < 11; i++)
// {
// std::string &Item = buffer.Push();
// Item = std::string(10240, 't');
// }
// // 读内容
// buffer.CacheSwap();
// // 读内容
// while (!buffer.Empty())
// {
// const std::string &Item = buffer.First();
// allBytes += Item.size();
// buffer.Pop();
// }
// }
// std::cout << buffer.data_reader->size() << " " << buffer.data_writer->size() << std::endl;
// std::cout << allBytes / (1024 * 1024 * 1024) << "GB" << std::endl;
// // std::this_thread::sleep_for(std::chrono::milliseconds(4));
// } while (true);
return 0;
}
[ ] rpc逻辑设计
一个主线程 main 多个worker1 worker2 worker3 worker4 worker5 为线程编号 5 0 1 2 3 4 其中线程之间两两都能进行通信采用 socketpair 如上面的例子直接创建 main-worker1 main-worker2 main-worker3 main-worker4 main-worker5 main与worker之间通信则直接用二者之间的专属UNIX域套接字 如果两个worker之间通信则 worker1->main->worker2 worker2->main->worker1
主线程负责listen socket 由epoll进行IO多路复用 epoll负责 (listensock)、(main-worker...)、(main-other)的收发读写 对于线程之间的消息收发缓存队列直接用std::vector好了可以采用read-write形式,其中std::string和std::vector采用read-write都能得到空间复用,而且不需要加锁了
但是需要为std::vector包装一层 成为Buffer内部具有读指针写指针
每个worker有自己的事件循环、有自己epoll,epoll需要管理worker-main
worker也是需要有任务类型一说的 在worker创建时需要提供类型 HTTP TCPSTREAM WEBSOCKET
worker需要为每个套接字连接维护事件循环、fd、 一个socket对象、一个connection对象
其中需要为每个connection对应设计sendbuffer与recvbuffer 当然这就不用为buffer加锁了,因为操作connection只会在worker内部,不存在多个线程同时操作同一个连接
要操作也是通过协议收发,进行线程之间的通信
对于buffer的设计则是根据不同的业务 HTTP TCPSTREAM WEBSOCKET单独设计 例如HTTP仍旧采用callback形式 TCPSTREAM WEBSOCKET 则需要设计来自不同线程的协议包,包括自身线程 接收到来自线程的数据 写进connection,能写进去则写,空间不够则应该在connection拉链,可以直接采用std::vector方式,和上面一样,总之就是线程之间的通信buffer组件了
同样设计read-write 注意这里的read-write不需要加锁了 直接指针交换即可 当有数据时,需要为相应连接触发下事件
像其他的业务处理都比较相似了,解包也不是差很多,包括嵌入SSL等等
关于进程通信还是不要内置提供了,毕竟需要提供一定的自由选择 (main worker worker worker)<----->center<------>(main worker worker worker) 由main进行RPC就好了 main<--RPC-->main,需要提供的对外接口就是, main<---->调用接口 RPC的fd需要mainepoll监听处理,对于RPC直接提供一个RPClistenFD,与server listen fd同等地位,只不过RPC listenfd accept到的连接交给mainepoll处理,RPC提供的连接 断开收发触发也是交给mainepoll 需要对外提供一个写数据到main线程的方式,所以对外提供一个socketpair就好了,一端提供给外部 一端留给main epoll自己 main 留一个回调,当有数据从外部写给main时,就触发,在回调内可以线程安全的将其发送给任意的worker 几乎是比较完美的actor模型了,差不多一切都是异步,即worker-other ,我们根本不知道other是谁,只接受来自other的消息就好了
关于主线程listen socker accept到的fd也是 通过消息传递给worker的 对于最大连接数的控制,直接采用std::atomic就好了,在不同线程间进行增减与大小判断,其实足够了,能够在指定大小范围内控制即可
main accept一个则进行+1 worker断开一个则进行-1
进而即可控制,当超过一定数量accept直接拒接close即可
对于为每个连接生成gid,采用原来方式即可,事件戳+轮询uint32基本就满足了,不仅要为连接生成gid,只要是sock都应生成gid sockpair也需要
对于程序完美停止 采用信号主线程处理 worker应屏蔽信号 主线程收到信号会通过标志位+忙等sleep worker已关闭标志位 双方通过标志位奔赴 标志位采用std::atomic即可
worker关闭时需要提供hook,client连接关闭时也需要提供hook
还应当提供停止接收新连接 + 踢掉所有已经存在的连接的方法
停止接收新连接通过other-main即可
踢掉所有已经存在则是other-main 由 main到所有worker,worker接收到直接进行踢出所有client连接就好了
最后程序收尾时 需要提供hook,同时也应对许多sockpair进行收尾处理 这样一来性能也比较好 各处也比较灵活 围绕main worker,每个线程提供消息来了 消息发出去的思想搞,每个线程都是一个Actor 每个线程只发自己的消息 只处理自己的消息,之间没有数据直接访问操作,除了部分的std::atomic