#!/usr/bin/env python
import unittest
import time
from tornado import ioloop
class TestIOLoop(unittest.TestCase):
def setUp(self):
self.loop = ioloop.IOLoop()
def tearDown(self):
pass
def _callback(self):
self.called = True
self.loop.stop()
def _schedule_callback(self):
self.loop.add_callback(self._callback)
# Scroll away the time so we can check if we woke up immediately
self._start_time = time.time()
self.called = False
def test_add_callback(self):
self.loop.add_timeout(time.time(), self._schedule_callback)
self.loop.start() # Set some long poll timeout so we can check wakeup
self.assertAlmostEqual(time.time(), self._start_time, places=2)
self.assertTrue(self.called)
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(msecs)03d %(levelname)-8s %(name)-8s %(message)s', datefmt='%H:%M:%S')
unittest.main()
IOLoop
函数说明
__init__
IOLoop声明的时候有三种状态:READ, WRITE, ERROR。
_impl 根据操作系统有三种不同的支持方式,linux2.5.44以上支持epoll,free bsd和mac支持kqueue,windows支持select。__impl_在这里是对事件机制的一个封装,_register_注册事件,_modify_修改事件,_unregister_销毁事件,_poll_轮询事件.
初始化实例的时候添加一个__readwaker,伪造一次IO事件,这样做的好处是以便事件循环阻塞而没有相应描述符出现,需要在最大timeout时间之前返回,就可以向这个管道发送一个字符,用来终止阻塞在监听阶段的事件循环监听函数。
instance
任意的一条线程共享IOLoop对象
add_handler
_self._handlers_是一个字典,其中fd作为键, handler作为值。对_self._impl_注册fd和events状态。
update_handler
对事件状态进行更新,调用_self.impl.modify
start
一个while循环,将_self._callbacks_里边的callback遍历执行,执行完callback再进行下一次的事件迭代。通过_self._impl.poll_取出事件和相应的handler,执行该事件。
stop
通过_self._read_waker_将_self._stopped_设置为False, 终止while循环。
通过test case 进行流程分析