levy5307 / blog

https://levy5307.github.io/blog/
MIT License
0 stars 0 forks source link

Doris数据读取流程 #80

Open levy5307 opened 7 months ago

levy5307 commented 7 months ago

https://levy5307.github.io/blog/doris-data-read/

Doris的底层数据读取是通过OlapScanNode发起的。当调用OlapScanNode::get_next时:

如果当前该OlapScanNode是非start状态时,则开始执行start_scan流程。

如果_materialized_row_batches为empty,并且传输没有完成时,则等待1秒钟之后继续查看,直到不为空

获取到row batch后,放入到返回参数row_batch中

前面提到,第一次执行get_next时,会触发start_scan流程:

根据conjuncts构建ColumnValueRange、olap filter以及scan keys

我们把过滤条件按照最外层的AND拆分之后的单元叫做conjunction,比如((A & B) | C) & D & E就是由((A & B) | C),D,E三个conjunction组成的。之所以这么定义是因为,conjunction是是否下推到存储的最小单元。一个conjunction里面的条件要么都下推,要么都不下推。

根据一系列参数获取ranges_per_scanner,并根据ranges_per_scanner以及scan ranges,构建一些OlapScanner,每个OlapScanner负责某一确定的tablet的部分ranges

对新创建出来的OlapScanner,分别执行prepare。在prepare中,获取tablet reader,并根据fe传递过来的数据version,获取[0, version]之间的所有rowset的reader

然后,创建一个线程,执行transfer_thread,该线程在后台异步的scan底层数据。

在transfer_thread中,首先根据_max_materialized_row_batches(最大物化batch数) / config::doris_scanner_row_num(一次read最多读取的行数) / state->batch_size()(一个batch的大小)得到执行scan的最大线程数量。然后在确保不超过前述最大线程数的前提下,给OlapScanner分配一个线程执行OlapScanNode::scanner_thread。

当transfer_thread中创建完scanner_thread后,等待_scan_row_batches中有row batch时,将row batch放入到_materialized_row_batches中,供OlapScanNode::get_next消费。

在scanner_thread中,首先执行OlapScanner::open操作,该函数主要对TabletReader进行初始化。主要包括初始化conditions(查询条件对象)、初始化bloom filter列集合(包含in、eq查询条件、或者bloom filter索引的列)、初始化DeleteHandler(主要用于判断row是否需要删除)