levy5307 / blog

https://levy5307.github.io/blog/
MIT License
0 stars 0 forks source link

Rdsn Task #24

Open levy5307 opened 4 years ago

levy5307 commented 4 years ago

https://levy5307.github.io/blog/rdsn-task/

一、global相关 service_spec: global specification 成员变量: std::vector threadpool_specs 保存所有threadpool的specification

成员函数: bool init() 1.从配置文件的[core] section读取配置参数 2.threadpool_spec::init(threadpool_spec)获取所有threadpool的specification –> threadpool_spec 3.task_spec::init()

二、threadpool相关 threadpool_code: threadpool_spec的index,内部只有一个int型_internal_code保存customizable id DEFINE_THREAD_POOL_CODE用于定义一个threadpool_code, 例如:代码中可以调用DEFINE_THREAD_POOL_CODE(THREAD_POOL_INVALID)去创建一个name=THREAD_POOL_INVALID的threadpool_code(根据name去注册customizable id)

threadpool_spec: thread pool specification static bool init(std::vector &specs) 读取配置中所有threadpool的specification, specs是输出参数

  1. 创建一个default_spec,从配置文件的[threadpool..default] section读取thread pool的默认参数
  2. 然后遍历所有的threadpool code, 根据threadpool code获取其name,然后根据default_spec, 以及从配置文件[threadpool.{name}] section获取具体的配置参数来好偶去spec, 放入所有的specs中 成员变量: std::string name thread_pool的名字(例如threadpool.THREAD_POOL_DEFAULT) dsn::threadpool_code pool_code threadpool_spec的index int worker_count 该threadpool的线程数量 worker_priority_t worker_priority 该threadpool的优先级

note: 目前只有task engine用到了thread pool

三、task相关 static std::array<std::unique_ptr, TASK_SPEC_STORE_CAPACITY> s_task_spec_store: 全局std::array, 保存所有的task_spec, 下标是task_code

task_code: 仅有一个int类型成员用于保存task code 构造时会调用task_spec::register_storage_task_code, 注册task code并创建相应的task_spec 可以使用DEFINE_TASK_CODE/DEFINE_TASK_CODE_RPC/DEFINE_TASK_CODE_AIO等宏去注册task code(并创建相应的task_spec), 其将会去创建相应的task_code 所以task_code和task_spec是在系统启动时候就创建好的,所以toollets可以将join point加入到所有的task_spec中

task_spec: 保存task specification(包括task code, 以及各种join point) static函数task_spec::register_storage_task_code; 用于注册task code并创建相应的task_spec, 如果task_type是TASK_TYPE_RPC_REQUEST,还要注册ack_code

task: 在创建时,会去s_task_spec_store中找相应的task_spec, task执行的不同阶段, 会去调用spec中不同的join_point.execute

task_worker: task线程,创建时为其指定一个task queue void start() 标记_is_running=true, 并创建一个线程运行run_internal()函数。最终线程进入loop()函数,不断从task queue中取出task执行task->exec_internal(根据task_worker_pool::_spec.dequeue_batch_size去batch size个) note: 由于queue有可能是共享的,所以queue的内部实现需要加锁

simple_timer_service::timer_service 定时任务服务 void add_timer(task *task) 将task加入到定时服务中 1.创建一个boost::asio::deadline_timer类型的timer 2.timer->expires_from_now(boost::posix_time::milliseconds(task->delay_milliseconds())) 设置delay时间 3.调用timer->async_wait, wait指定的delay时间. 同时指定一个callback,该callback将执行task->enqueue(timer_task::enqueue) timer_task::enqueue最终将改task放入到了task_worker_pool中的对应的task queue中,随后task_worker会异步的取出该task执行,执行完task的callback后将task的状态设置为READY,以便再次执行(task::exec_interval在运行完其exec()函数后,会判断其状态,如果不是Finish状态,则继续将task入队). 并将其delay时间为interval_milliseconds, 只是delay时间变为了timer_interval, 这样task::enqueue再次入队时,由于delay时间不为0,则调用add_timer,跳到了1。 note: 1.所以timer_task的执行最终也是放入到task_queue中,正如其名字,timer_service只是提供了一个定时服务,最终执行的还是task_worker_pool中定义的线程 2.这样看定时任务也不是特别精确准时

task_worker_pool: task线程池 成员变量: threadpool_spec _spec threadpool specification task_engine owner 指向task_engine的指针 service_node _node 指向service_node的指针 std::vector<task_worker > _workers 所有的task线程 std::vector<task_queue > _queues 所有的task_queue(每个分片一个queue) std::vector<timer_service *> _per_queue_timer_svcs 所有的定时任务服务(每个分片一个queue)

成员函数: void create() 紧跟在构造函数之后使用 1.如果_spec.partitioned等于true,分片数量qCount=_spec.worker_count(每个分片一个queue), 否则qCount=1. 创建qCount个queue, 放入_queues中 2.创建qCount个timer_service,放入_per_queue_timer_svcs 3.根据_spec.worker_count来创建task_worker,执行task_worker::create函数,并将所有worker放入_workers。 在创建task_worker时需要为其指定task_queue,如果是分片模式,则每个worker一个queue,否则所有的worker都指定同一个queue(因为只有一个) void start() 启动task_work_pool 1.启动所有的定时任务 2.启动所有的task_worker void add_timer(task t) 1.idx = (_spec.partitioned ? static_cast(t->hash()) % static_cast(_per_queue_timer_svcs.size()) : 0) 获取分片index, 如果_spec中指定是分片模式,则根据task t的hash值与定时队列的数量取模,否则就是0(此时只有一个定时任务queue) 2._per_queue_timer_svcs[idx]->add_timer(t) 向queue中插入该定时任务 void enqueue(task t) 将task t插入到task queue中 1.计算分片index 2.根据分片获取相应的queue,将task *t放入到queue中

task_engine: 每个service_node有且仅有一个task_engine, 在service_node::start函数中,会创建一个task_engine,并调用其create接口 成员变量: std::vector<task_worker_pool > _pools 所有的thread pool, 一个task_engine有多个thread pool volatile bool _is_running 是否正在运行 service_node node 指向service_node的指针,每个service都有一个task_engine 成员函数: void create(const std::list &pools) 创建所有的thread pool 遍历所有的threadpool_code, 1.获取改threadpool_code对应的threadpool_spec 2.根据threadpool_spec new一个task_worker_pool 3.调用workerPool->create() 4.将workerPool放入到_pools中 void start() 对所有的thread pool执行task_worker_pool::start

TODO:

其他: join_point: join_point里面包含一个advice_entry双向循环链表,可以通过put_back,put_front等函数向其中添加advice_entry,在调用execute时将所有advice_entry执行一遍。 可以用于错误注入等,向代码的执行中注入各种逻辑操作。例如task中就有各种join_point,用于在task运行时的不同阶段,注入不同的代码

toollets: 在配置文件中通过toollets定义,在service_api_c.cpp的run()函数中,会初始化toollets, 遍历所有的task_spec, install各种join point到task_spec中