snower / TorMySQL

The highest performance asynchronous MySQL driver by PyMySQL
MIT License
308 stars 63 forks source link

想请教tornado是不是异步非阻塞的问题? #28

Closed fatpo closed 6 years ago

fatpo commented 6 years ago

大神你好。 0、 确实是走投无路,周围也没人研究tornado可问,谷歌又用不好,思索再三才来麻烦大神。 哎,真的不知道tornado怎么才是正确读源码的姿势。。。

1、 最近读到一个设计模式Reactor,感觉tornado就是这个模式。但是里面说Reactor是IO多路复用,同时也是异步阻塞。为什么阻塞呢,它说因为此模式还是用到select()这种系统调用,会阻塞在select上(linux下tornado是用epoll啦,应该差不多吧)。那么它到底是不是异步阻塞呢?

2、 如果它真的是Reactor模式,别人是分两个线程:用户线程+Reactor线程。 image

Reactor负责轮询,select返回后调用用户线程的handle_event(),但是我用tornado的时候,查看到只有一个线程,查看方法(top -pid yourpid),难道真的一个线程也能来轮询 + 注册handler之类的吗?感觉会一直卡在

Reactor::handle_events(){
    while(1){
        sockets = select();
        for(socket in sockets){
            get_event_handler(socket).handle_event();
        }
    }
}

它貌似只能在上述的while循环中处理每个EventHandler的handle_event(),应该做不了别的事了吧。

参考: http://blog.csdn.net/baixiaoshi/article/details/48708347 IO多路复用的小节

snower commented 6 years ago

1、其实无论是select还是epoll、kqueue,其实都不是真正的异步io,只能称为io多路复用,他本身在读取数据还是和阻塞模式是一样的,区别只是我用用select或者epoll向系统注册了io读写事件监听器,等真正我们去读取数据的时候数据已经准备好了,所以不会产生阻塞,如果你不等待系统io事件直接去读数据,其实还是很阻塞同步io一样的,而等待系统io读写事件通知又需要我们主动发起并且是阻塞式的,真正的异步应该是我告诉你需要什么东西,你准备好通知我,所以这是阻塞的。

那么为什么又会说异步呢?其实是这样的,在真正的实际使用中,我们会给socket创建的时候会设置一个NONBLOCK标记,这样如果我们没有在系统io告知的时候去读取数据,那么会立刻返回一个io错误表示已经没有数据了,这样线程不会阻塞在io读取时间从而可以释放出cpu去接着做其他事情,比如接着监听是否还有其他io读写事件。 https://github.com/tornadoweb/tornado/blob/master/tornado/iostream.py#L1094 你可看这。

之所以要这么设计,是因为在实际使用中,当io读取事件到达时我们并不知道有多少数据准备好了,但读取缓冲区是有限的,亦有可能在我们读取数据处理的过程中又有新的数据到达,现在都已经是万兆网了,所以我们并不能一次读取完所有数据,所以一般会循环读取多次知道io返回错误,从而告知我们可以继续处理其他io事件或者继续监听系统io。 https://github.com/tornadoweb/tornado/blob/master/tornado/iostream.py#L680 具体过程你可以在这看到。

当然异步还有第二层含义是,当我们通过监听系统io读写事件到达时,可以立刻就用当前线程去处理io读写事件,比如tornado就是这么做的,也可以把这个事件一个队列里,之后再调度这个队列去处理io读写,调度的过程可以仍然由这个线程完成也可以由其他线程完成,比如java的Reactor,go语言的协程实现。 https://github.com/tornadoweb/tornado/blob/master/tornado/ioloop.py#L921 具体过程你可以在这看到。

这两者的区别来说这两者并不大,cpu核心数总是有限的,而没有cpu核心同一时刻只能运行一个线程,如果在没有io阻塞产生的系统中,线程不会暂停始终占据cpu运行时间,那么已经是系统效率最大了,使用另外的线程调度io事件并不能加快系统运行效率,额外的线程还会导致更复杂的内存同步为题,所以一般第二种设计中,会使用一个线程监听select或epoll等待系统io事件,而用和cpu核心数相同的线程数处理io读写事件。

2、为什么一个线程可以既处理select监听系统io事件,handle_event,又可以发起注册handler,是因为所有需要做的事情都是以io事件来驱动的,select虽然会阻塞线程,但是现在同样也没有任何需要处理的事情,handle_event过程一般只是完成数据读取简单处理过程,所以这个过程会非常快,所以也不会影响其他事件的处理,而注册handler这个过程大部分情况会在handle_event的过程中发起,或者由外部的其他线程发起,所以虽然是个while死循环,但是仍然可以不停的处理各种事情。

假如在select还是其他普通事件没有处理完成,那么tornado会给select设置一个0的超时返回时间,那么select就不会再阻塞,此时并可以立刻接着处理其他事情。 假如在select阻塞的过程中需要处理超时事件,tornado的做法的会事先把超时按从小到大排序,选取小的超时时间作为select的超时等待返回时间,这样tornado就可以在超时事件到期的时候立刻开始处理。 https://github.com/tornadoweb/tornado/blob/f5df1e4df7ca149342957f71bf4f98710f6cccdf/tornado/ioloop.py#L859 具体过程你可以在这看到。

假如在select的时候系统有其他任务想处理,比如响应系统退出事件,或者其他线程有io操作需要处理,那么tornado可以使用waker唤醒线程返回,具体做法是事先创建一个pipe或本地socket注册到系统中,需要唤醒线程时往里边写一个字符,select并会收到事件并返回,此时就可以接着处理其他事情了,从而达到唤醒线程的目的。 https://github.com/tornadoweb/tornado/blob/3236f0704a2522a208d4d43afcecf2b8e6bfb538/tornado/platform/common.py#L27 具体过程你可以看这个waker的实现就知道了。

3、tornado只使用一个线程运行,除了上面所说cpu核心同一时刻只能运行一个线程,在没有io阻塞的情况下cpu已经是满负载了,以及使用单个线程可以不需要处理负载的内存同步过程,可以简化系统结构提供效率,同时tornado这样的以io事件驱动的系统一般面向的是比如推送、高并发web api、代理等io非常频繁非常多单cpu计算量很小的系统。

在这样的系统中,很多时候需要同时处理数万、数十万,甚至数百万的连接,如果以同步阻塞io的话,每个线程一个连接,但系统能创建的线程数是有上限的,系统的内存也是有限的,没创建一个线程只是栈就至少需要1MB,同时创建百万以上的线程还是很有困难的。

其次,系统再调度线程以及线程切换的过程中都是需要消耗资源的,在大量线程的系统中,线程调度和线程切换当会产生非常高的系统资源消耗,而使用单一线程处理所有连接事件将会大幅度提高系统性能。

tornado使用单一线程处理还有一个很重要的原因也是受python全局GIL锁的影响,统一时刻,只能有一个线程真实的在运行,所以在生产环境部署中,一般会创建和cpu核心数相同的进程来充分使用系统资源。

在tornado源码中:

ioloop.py 完成io事件循环,start函数就是io时间主循环过程了

iostream.py 完成io读写过程,_read_to_buffer就是核心取过程

这两个文件基本就是tornado的io核心过程了,platform下边还放了一些针对各平台的一些具体实现

fatpo commented 6 years ago

那么Waker的话,我大概理解大神的解释。 有一点疑问: 设置非阻塞可以理解,但是为什么要设置FD_CLOEXEC呢,据我所知,这个用在fork子进程然后有exec操作的时候会自动关闭子进程的fd(虽然不知道为何非要关闭,感觉父进程去关闭fd就好了呀,难不成还有类似python的GC的引用-1,如果没减少到0就无法回收?还是socket端口占用?),但是这里又不会fork。。为啥非要搞这个FD_CLOEXEC(close-on-exec)?

def set_close_exec(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

def _set_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

class Waker(interface.Waker):
    def __init__(self):
        r, w = os.pipe()
        _set_nonblocking(r)
        _set_nonblocking(w)
        set_close_exec(r)
        set_close_exec(w)
        self.reader = os.fdopen(r, "rb", 0)
        self.writer = os.fdopen(w, "wb", 0)

    def fileno(self):
        return self.reader.fileno()

    def write_fileno(self):
        return self.writer.fileno()

    def wake(self):
        try:
            self.writer.write(b"x")
        except IOError:
            pass

    def consume(self):
        try:
            while True:
                result = self.reader.read()
                if not result:
                    break
        except IOError:
            pass

    def close(self):
        self.reader.close()
        self.writer.close()