Closed yiippee closed 4 years ago
感谢你开源的高性能通信框架啊,谢谢。 在看源码的时候,我的理解是doPendingFunc函数的执行是通过eventfd来触发的,这样可以统一通过epollWait来调度执行,但有两个疑问: 1,为什么要统一通过epollWait来统一调度呢,不可以想写数据的时候直接就往connfd中write呢?这样会有什么问题呢?谢谢。 2,
func (l *EventLoop) doPendingFunc() { l.mu.Lock() pf := l.pendingFunc l.pendingFunc = nil l.mu.Unlock() length := len(pf) for i := 0; i < length; i++ { // 遍历所有的pengdingFunc,并执行 pf[i]() } }
doPendingFunc 这个函数会遍历所有的pendingFunc并执行,那么只要一个pendingFunc写入的eventfd被调度了,那么就会执行所有的pendingFunc,而且定时器也会定时触发这个调度。这样感觉会导致一些异步任务的执行并不是由这个任务本身去触发的,而是由其他的任务触发的,而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)。 请问是我哪里理解错了吗?谢谢。
因为可能存在这样一种情况,对方接受数据缓慢,无法继续向内核缓冲区写入数据,此时 write buffer 里会缓存此数据。这时候直接往 connfd 中 write 写入,会导致消息乱序。
而且定时器也会定时触发这个调度。
定时器的实现和这个 eventfd 没有绑定啊,不太理解你的意思。
而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)
这个是的,确实会有 多次 write,read eventfd 的情况,如果批量处理,又会造成延时,目前我没有想到相对完美的方案,如果你有想法,可以提出来,或者直接 PR 。 (PS:eventfd 每次读写8字节,这个开销相对还是比较小的)
感谢你开源的高性能通信框架啊,谢谢。 在看源码的时候,我的理解是doPendingFunc函数的执行是通过eventfd来触发的,这样可以统一通过epollWait来调度执行,但有两个疑问: 1,为什么要统一通过epollWait来统一调度呢,不可以想写数据的时候直接就往connfd中write呢?这样会有什么问题呢?谢谢。 2,
func (l *EventLoop) doPendingFunc() { l.mu.Lock() pf := l.pendingFunc l.pendingFunc = nil l.mu.Unlock() length := len(pf) for i := 0; i < length; i++ { // 遍历所有的pengdingFunc,并执行 pf[i]() } }
doPendingFunc 这个函数会遍历所有的pendingFunc并执行,那么只要一个pendingFunc写入的eventfd被调度了,那么就会执行所有的pendingFunc,而且定时器也会定时触发这个调度。这样感觉会导致一些异步任务的执行并不是由这个任务本身去触发的,而是由其他的任务触发的,而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)。 请问是我哪里理解错了吗?谢谢。
- 因为可能存在这样一种情况,对方接受数据缓慢,无法继续向内核缓冲区写入数据,此时 write buffer 里会缓存此数据。这时候直接往 connfd 中 write 写入,会导致消息乱序。
而且定时器也会定时触发这个调度。
定时器的实现和这个 eventfd 没有绑定啊,不太理解你的意思。而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)
这个是的,确实会有 多次 write,read eventfd 的情况,如果批量处理,又会造成延时,目前我没有想到相对完美的方案,如果你有想法,可以提出来,或者直接 PR 。 (PS:eventfd 每次读写8字节,这个开销相对还是比较小的)感谢你开源的高性能通信框架啊,谢谢。 在看源码的时候,我的理解是doPendingFunc函数的执行是通过eventfd来触发的,这样可以统一通过epollWait来调度执行,但有两个疑问: 1,为什么要统一通过epollWait来统一调度呢,不可以想写数据的时候直接就往connfd中write呢?这样会有什么问题呢?谢谢。 2,
func (l *EventLoop) doPendingFunc() { l.mu.Lock() pf := l.pendingFunc l.pendingFunc = nil l.mu.Unlock() length := len(pf) for i := 0; i < length; i++ { // 遍历所有的pengdingFunc,并执行 pf[i]() } }
doPendingFunc 这个函数会遍历所有的pendingFunc并执行,那么只要一个pendingFunc写入的eventfd被调度了,那么就会执行所有的pendingFunc,而且定时器也会定时触发这个调度。这样感觉会导致一些异步任务的执行并不是由这个任务本身去触发的,而是由其他的任务触发的,而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)。 请问是我哪里理解错了吗?谢谢。
- 因为可能存在这样一种情况,对方接受数据缓慢,无法继续向内核缓冲区写入数据,此时 write buffer 里会缓存此数据。这时候直接往 connfd 中 write 写入,会导致消息乱序。
而且定时器也会定时触发这个调度。
定时器的实现和这个 eventfd 没有绑定啊,不太理解你的意思。而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)
这个是的,确实会有 多次 write,read eventfd 的情况,如果批量处理,又会造成延时,目前我没有想到相对完美的方案,如果你有想法,可以提出来,或者直接 PR 。 (PS:eventfd 每次读写8字节,这个开销相对还是比较小的)
谢谢你的回答啊。 关于2,
func (s *Server) Start() {
s.server.RunEvery(1*time.Second, s.RunPush) // 启动定时器
s.server.Start()
}
func (s *Server) RunPush() {
var next *list.Element
s.mu.RLock()
defer s.mu.RUnlock()
for e := s.conn.Front(); e != nil; e = next {
next = e.Next()
c := e.Value.(*connection.Connection)
_ = c.Send([]byte("hello\n")) // 定时任务周期性 write eventfd
}
}
定时器也会调用send,最终也是触发write eventfd的。 我是觉得异步任务的执行并不是由这个任务本身去触发的,而是有可能由其他的任务触发,而且是在不同goroutine中,这样感觉怪怪的。但应该不会出错,因为任务提交了本身就想要立刻执行,即使不是自己触发执行。eventfd是全局唯一的,感觉无法区分不同的连接,所以没办法单独处理异步任务。还有个疑问,eventfd读写的那8个字节有什么意义呢?好像没存储信息,是类似于信号量吗?谢谢。 感觉读写8字节的性能开销比较小,只是多次无意义触发epollWait这个系统调用,感觉就比较大了啊。
之所以需要通过 eventfd 唤醒 epoll 来发送,主要是因为线程安全问题。
eventfd 是每个 poller 一个,是用来唤醒 epoll 的。
eventfd 相关可以搜索看看,大多数用来实现事件通知的,gev 用来唤醒 epoll。 https://linux.die.net/man/2/eventfd
你好,epoll_ctl是多线程安全的,所以你说的 “主要是因为线程安全问题。”,应该是没问题的吧。事件通知时,直接对这个connfd注册写事件和写回调函数(就是doPendingFunc)。这样会有问题吗?谢谢。 不过我看很多网络库都是用eventfd的☺
你好,epoll_ctl是多线程安全的,所以你说的 “主要是因为线程安全问题。”,应该是没问题的吧。事件通知时,直接对这个connfd注册写事件和写回调函数(就是doPendingFunc)。这样会有问题吗?谢谢。 不过我看很多网络库都是用eventfd的☺
是 ringbuffer 不是线程安全的,在 io 线程(协程)里会收发数据(操作ringbuffer),如果允许在别的协程同步发送数据,必然需要去处理 ringbuffer 的竞争问题,个人觉得这会使代码非常复杂。
你好,epoll_ctl是多线程安全的,所以你说的 “主要是因为线程安全问题。”,应该是没问题的吧。事件通知时,直接对这个connfd注册写事件和写回调函数(就是doPendingFunc)。这样会有问题吗?谢谢。 不过我看很多网络库都是用eventfd的☺
是 ringbuffer 不是线程安全的,在 io 线程(协程)里会收发数据(操作ringbuffer),如果允许在别的协程同步发送数据,必然需要去处理 ringbuffer 的竞争问题,个人觉得这会使代码非常复杂。
不对啊,你注册写事件后,也是由epollWait调度的啊,epollWait统一处理是单线程啊,epollWait回调写的时候,看ringbuffer是否有数据,有就把这次要写的追加,再从头开始写。感觉没啥问题啊。难道我哪里又理解错了吗?谢谢。
你好,epoll_ctl是多线程安全的,所以你说的 “主要是因为线程安全问题。”,应该是没问题的吧。事件通知时,直接对这个connfd注册写事件和写回调函数(就是doPendingFunc)。这样会有问题吗?谢谢。 不过我看很多网络库都是用eventfd的☺
是 ringbuffer 不是线程安全的,在 io 线程(协程)里会收发数据(操作ringbuffer),如果允许在别的协程同步发送数据,必然需要去处理 ringbuffer 的竞争问题,个人觉得这会使代码非常复杂。
不对啊,你注册写事件后,也是由epollWait调度的啊,epollWait统一处理是单线程啊,epollWait回调写的时候,看ringbuffer是否有数据,有就把这次要写的追加,再从头开始写。感觉没啥问题啊。难道我哪里又理解错了吗?谢谢。
是啊,你不是想在别的协程里 直接 write(fd, data)
, 而不是通知 epollWait统一处理的线程 去 write ?
你好,epoll_ctl是多线程安全的,所以你说的 “主要是因为线程安全问题。”,应该是没问题的吧。事件通知时,直接对这个connfd注册写事件和写回调函数(就是doPendingFunc)。这样会有问题吗?谢谢。 不过我看很多网络库都是用eventfd的☺
是 ringbuffer 不是线程安全的,在 io 线程(协程)里会收发数据(操作ringbuffer),如果允许在别的协程同步发送数据,必然需要去处理 ringbuffer 的竞争问题,个人觉得这会使代码非常复杂。
不对啊,你注册写事件后,也是由epollWait调度的啊,epollWait统一处理是单线程啊,epollWait回调写的时候,看ringbuffer是否有数据,有就把这次要写的追加,再从头开始写。感觉没啥问题啊。难道我哪里又理解错了吗?谢谢。
是啊,你不是想在别的协程里 直接
write(fd, data)
, 而不是通知 epollWait统一处理的线程 去 write ?
不是。是不想通过eventfd去通知,而是直接对connfd注册可写事件和写回调。这样就避免了上面提到的多次无意义的的唤醒eventfd了。 你上面提到的 :
而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了) 这个是的,确实会有 多次 write,read eventfd 的情况,如果批量处理,又会造成延时,目前我没有想到相对完美的方案,如果你有想法,可以提出来,或者直接 PR 。 (PS:eventfd 每次读写8字节,这个开销相对还是比较小的)
我明白你的意思了,你这种想法感觉也可行,但是会不会带来问题,能带来多少性能提升,还需要实际测试下。
func (c *Connection) HandleEvent(fd int, events poller.Event) {
if events&poller.EventErr != 0 {
c.handleClose(fd)
return
}
if c.outBuffer.Length() != 0 {
// 如果可读事件来了,但是outbuffer中有数据,又没法读,会一直死循环这里啊。
if events&poller.EventWrite != 0 {
c.handleWrite(fd)
}
} else if events&poller.EventRead != 0 {
c.handleRead(fd)
}
}
func (c *Connection) HandleEvent(fd int, events poller.Event) { if events&poller.EventErr != 0 { c.handleClose(fd) return } if c.outBuffer.Length() != 0 { // 如果可读事件来了,但是outbuffer中有数据,又没法读,会一直死循环这里啊。 if events&poller.EventWrite != 0 { c.handleWrite(fd) } } else if events&poller.EventRead != 0 { c.handleRead(fd) } }
不会死循环,此处主要是为了防止客户端接受缓慢(或者只发送,不接受数据)导致内存暴涨,所以优先发送完数据,再去读取数据。
@yiippee 你的那个想法 不想通过eventfd去通知,而是直接对connfd注册可写事件和写回调
我拉个新分支,咱们实现测测看? 有兴趣一起搞吗?
@yiippee 你的那个想法
不想通过eventfd去通知,而是直接对connfd注册可写事件和写回调
我拉个新分支,咱们实现测测看? 有兴趣一起搞吗?
哈哈,可以啊,不过我大概改了下,echo可以正常运行。其实改动很少的,你可以看一下,我看你做了很多相关测试,应该更有经验,跑你原来的测试用例应该差不多了,谢谢啊。
connection/connection.go
// Send 用来在非 loop 协程发送
func (c *Connection) Send(buffer []byte) error {
if !c.connected.Get() {
return errors.New("connection closed")
}
c.outBuffer.Write(buffer) // 准备好数据,这里需要对ringbuffer加锁
c.loop.EnableWrite(c.fd) // 装载写事件
// c.loop.QueueInLoop(func() {
// c.sendInLoop(c.protocol.Packet(c, buffer))
// })
return nil
}
example/echo.go
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
// log.Println("OnMessage")
// out = data
go func() {
time.Sleep(3 * time.Second) // 阻塞
c.Send([]byte("hello\n"))
}()
return
}
哈哈,可以啊,不过我大概改了下,echo可以正常运行。其实改动很少的,你可以看一下,我看你做了很多相关测试,应该更有经验,跑你原来的测试用例应该差不多了,谢谢啊。
connection/connection.go
// Send 用来在非 loop 协程发送 func (c *Connection) Send(buffer []byte) error { if !c.connected.Get() { return errors.New("connection closed") } c.outBuffer.Write(buffer) // 准备好数据,这里需要对ringbuffer加锁 c.loop.EnableWrite(c.fd) // 装载写事件 // c.loop.QueueInLoop(func() { // c.sendInLoop(c.protocol.Packet(c, buffer)) // }) return nil }
example/echo.go
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) { // log.Println("OnMessage") // out = data go func() { time.Sleep(3 * time.Second) // 阻塞 c.Send([]byte("hello\n")) }() return }
对 ringbuffer 加锁会导致IO线程变慢的,因为所有的客户端读写操作都在 IO 线程中进行。
原本想法是为 每一个连接的客户端维护一个 pendingFunc。今天想了下,觉得还是不用 epoll_ctl 来唤醒比较好,因为这样也不能解决重复唤醒的问题,也不能带来额外的益处,反而会让代码更复杂。
对现有的 eventloop 增加状态,可以通过这个状态来避免重复唤醒。具体做法: https://github.com/Allenxuxu/gev/commit/22a5565d2bcdb8c459f8bfcf7ab9ffe4e69b9754
有空可以看看,如果觉得有什么不妥之处,直接与我沟通😊
c.outBuffer.Write(buffer) // 准备好数据,这里需要对ringbuffer加锁 c.loop.EnableWrite(c.fd) // 装载写事件
1,不是所有客户端啊,只有其中的一个connfd的outbuffer才需要锁啊,而且一般来说具体的一个connfd也不会特别频繁执行异步任务,可以考虑乐观锁。另,eventloop.go中才是所有客户端共用一把锁吧:
l.mu.Lock()
l.pendingFunc = append(l.pendingFunc, f)
l.mu.Unlock()
2,
func (l *EventLoop) handlerEvent(fd int, events poller.Event) {
l.eventHandling.Set(true)
if fd != -1 {
s, ok := l.sockets.Load(fd)
if ok {
s.(Socket).HandleEvent(fd, events) // 这里是非eventfd执行
}
}
l.eventHandling.Set(false)
l.doPendingFunc() // 执行eventfd回调
}
不太理解用于标志是否已经触发了eventfd的eventHanding这样用有什么效果,是位置放错了吗?eventfd是全局的,来控制所有的异步任务,感觉通过eventHanding没法避免重复唤醒。我没有仔细测试啊。
所有 connfd 的读写操作 都是在 eventloop 中做的, 并非一个 conn 一个协程。
这样如果 epoll_wait 已经返回,并且还没有走到 l.doPendingFunc()
就不需要去唤醒 epoll 了。
- 所有 connfd 的读写操作 都是在 eventloop 中做的, 并非一个 conn 一个协程。
啊,有道理,感觉这是主要原因,我之前没觉察到啊
- 这样如果 epoll_wait 已经返回,并且还没有走到
l.doPendingFunc()
就不需要去唤醒 epoll 了。
之前是
else {
l.doPendingFunc()
}
为啥现在改成了不管怎样都执行一把呢?
为了 减少 唤醒的次数啊,如果当前 epoll 正在处理 io 事件,另一个协程 send 了一个信息,那边io事件处理完就直接 doPendingFunc
。
为了 减少 唤醒的次数啊,如果当前 epoll 正在处理 io 事件,另一个协程 send 了一个信息,那边io事件处理完就直接
doPendingFunc
。
你现在做法是不管fd是不是eventfd,都需要做一次doPendingFunc,而且doPendingFunc里面还有锁操作。
是的,锁的粒度已经尽量小了。如果你有更好的处理方式,可以提出来。
是的,锁的粒度已经尽量小了。如果你有更好的处理方式,可以提出来。
感觉有点走偏了啊,本来eventfd和connfd事件本来就要区分的。 你之前说的“每一个连接的客户端维护一个 pendingFunc”感觉可以再考虑一下,pendingFunc本来也是属于具体的一个连接的
啊,唤醒的问题感觉还是没解决啊。。。
是的,锁的粒度已经尽量小了。如果你有更好的处理方式,可以提出来。
感觉有点走偏了啊,本来eventfd和connfd事件本来就要区分的。 你之前说的“每一个连接的客户端维护一个 pendingFunc”感觉可以再考虑一下,pendingFunc本来也是属于具体的一个连接的
啊,唤醒的问题感觉还是没解决啊。。。
原来的那种方式并不能解决问题啊, 而且会提高唤醒的次数,因为 每次 wake 都只能唤醒自己, 现在一次唤醒还可以把别的 pendingFunc 也执行了。
是的,谢谢耐心回答。不知道比较出名的库是怎么做的,比如libevent之类的,没看过啊。。。 这个issue可以关了。
是的,谢谢耐心回答。不知道比较出名的库是怎么做的,比如libevent之类的,没看过啊。。。 这个issue可以关了。
大多数网络库,都会通过 eventfd 或者 pipe 来做唤醒的。
今天突然想了一下,是否可以利用eventfd中读到的信号个数(n, err := unix.Read(ep.eventFd, buf) 这个buf)来判定具体有多少个pendingfun需要执行,这样就可以避免这把锁了?这样感觉就把所有信息利用起来了,你目前的做法好像把这个信息丢了。我总觉得epoll线程不应该有任何锁的。
l.mu.Lock()
pf := l.pendingFunc
l.pendingFunc = nil
l.mu.Unlock()
不知道这样做会有什么问题啊?谢谢。
今天突然想了一下,是否可以利用eventfd中读到的信号个数(n, err := unix.Read(ep.eventFd, buf) 这个buf)来判定具体有多少个pendingfun需要执行,这样就可以避免这把锁了?这样感觉就把所有信息利用起来了,你目前的做法好像把这个信息丢了。我总觉得epoll线程不应该有任何锁的。
l.mu.Lock() pf := l.pendingFunc l.pendingFunc = nil l.mu.Unlock()
不知道这样做会有什么问题啊?谢谢。
不可以。
pendingFunc append 会有扩容的情况的,肯定需要锁的。
你好,你目前doPendingFunc()做法是怎么样都会执行一次的,减少eventfd唤醒是依赖读数据时肯定会去执行doPendingFunc,所以就不需要唤醒eventfd,但是读数据后开一个goroutine去执行还是会唤醒eventfd的。总感觉你这样的做法感觉怪怪的,没有将eventfd与普通fd区分开来。我找到muduo的做法,EventLoop.cc: 254
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true; // 设置正在执行pendingFunc
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
这种做法是否更好一些,或者你有其他的原因呢?谢谢。
你好,你目前doPendingFunc()做法是怎么样都会执行一次的,减少eventfd唤醒是依赖读数据时肯定会去执行doPendingFunc,所以就不需要唤醒eventfd,但是读数据后开一个goroutine去执行还是会唤醒eventfd的。总感觉你这样的做法感觉怪怪的,没有将eventfd与普通fd区分开来。我找到muduo的做法,EventLoop.cc: 254
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; // 设置正在执行pendingFunc { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (const Functor& functor : functors) { functor(); } callingPendingFunctors_ = false; }
这种做法是否更好一些,或者你有其他的原因呢?谢谢。
直接PR你的想法,我来看看
你好,你目前doPendingFunc()做法是怎么样都会执行一次的,减少eventfd唤醒是依赖读数据时肯定会去执行doPendingFunc,所以就不需要唤醒eventfd,但是读数据后开一个goroutine去执行还是会唤醒eventfd的。总感觉你这样的做法感觉怪怪的,没有将eventfd与普通fd区分开来。我找到muduo的做法,EventLoop.cc: 254
void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; // 设置正在执行pendingFunc { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (const Functor& functor : functors) { functor(); } callingPendingFunctors_ = false; }
这种做法是否更好一些,或者你有其他的原因呢?谢谢。
直接PR你的想法,我来看看
不好意思,又看了下muduo,muduo如果在新线程中发送也是肯定会唤醒eventfd的。你这种做法是对的
感谢你开源的高性能通信框架啊,谢谢。 在看源码的时候,我的理解是doPendingFunc函数的执行是通过eventfd来触发的,这样可以统一通过epollWait来调度执行,但有两个疑问: 1,为什么要统一通过epollWait来统一调度呢,不可以想写数据的时候直接就往connfd中write呢?这样会有什么问题呢?谢谢。 2,
doPendingFunc 这个函数会遍历所有的pendingFunc并执行,那么只要一个pendingFunc写入的eventfd被调度了,那么就会执行所有的pendingFunc,而且定时器也会定时触发这个调度。这样感觉会导致一些异步任务的执行并不是由这个任务本身去触发的,而是由其他的任务触发的,而且还会有多次write(eventfd)后,多次read(eventfd)没必要的情况(因为pengdingFunc已经全部执行完了)。 请问是我哪里理解错了吗?谢谢。