Open HappenLee opened 4 years ago
@HappenLee I don't see the performance improve for your POC?
What's your detailed design?
Do you plan to still use RowBlockV2
and ColumnBlock
?
Do you plan to still convert RowBlockV2
to RowBlock
?
Why do we still use VectorizedRowBatch
?
@HappenLee Vectorization is a good way to accelerate execution. However in my point of view this will change all the execution framework. So before real code work, we should discuss the work in more details. Could you please explain your implementation more clearly?
At present, only aggregatenode has been verified .
First, we do tuple convert columnVector and time consuming of conversion operation is recorded.
{
Timer time;
FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
// Hoist lookups out of non-null branch to speed up non-null case.
TupleRow *in_row = in_batch_iter.get();
for (int i = 0; i < agg_fn_evals_.size(); ++i) {
void** values = static_cast<void **>(valuesVector[i].col_data());
values[count] = agg_fn_evals_[i]->getAggValue(in_row);
}
for (int i = 0; i < ht_ctx->get_key_size(); ++i) {
void** keys = static_cast<void **>(keysVector[i].col_data());
keys[count] = ht_ctx->get_key(in_row, i);
}
count++;
}
_cost_time += time.stop();
}
EvalAndHashPrefetchGroup(keysVector, count, ht_ctx, remaining_capacity, tuples);
for (int i = 0; i < agg_fn_evals_.size(); ++i) {
UpdateTuple(agg_fn_evals_[i], tuples, static_cast<void**>(valuesVector[i].col_data()), count);
}
select max(C_PHONE) from customer group by C_MKTSEGMENT;
Count the time of aggratetation. we can find deducte the time of row conversion, the time cut in half.
Statistic | origin | row convert to column |
---|---|---|
Time | 4.19 Sec | 2.47 Sec |
There is three important parts: Rewriting of scannode, Rewriting of expression calculation, Rewriting of aggregatenode.
We can make a judgment when querying the plan. If query on dup_key and only have simple expressions, the vectorization interface will be called. The pseudo code is as follows:
VectorizedBatch vbatch = _reader->next_batch_with_aggregation();
vector<rowId> selectedRowId = Expr.eval(vbatch);
convertVecBatchToRowBatch(vbatch, selectedRowId, output_rowbatch);
If there are other expressions that are not supported temporarily, continue to use the old interface. The input of new and old interfaces is different (one is row, the other is vbatch), but the output is the same (both are the original rowbatch of query layer)
So far, the changes of scan part and expr part are basically completed. We can rewrite the aggregate node now.
Design and implement a new hash table structure that is compatible with the column storage model
Rewrite the computing model of the corresponding aggregation operator to support the vectorization
After all of that,the final goal of the first phase is to vectorize the aggregate query of a single table. If there is a join of multiple tables or a sort node, the old execution logic is still used.
Later, we can gradually extend the vectorization calculation to other execution nodes. It's going to be a long-term plan.
@kangkaisen
Rowblock is an old definition. At present, the storage layer and query layer still use rowblock interface to pass data. The query layer accesses the data inside rowblock through rowcursor. Data in rowblock is stored by row.
Rowblockv2 is a new definition which is vectorized. It contains multiple columnblocks. Each columnblock is a column of data. Rowblockv2 is actually the newly defined vectorizedrowbatch.New segment V2 data is read in rowblockv2 format. The BetaRowsetReader will convert rowblockv2 to rowblock to returns the data.
The reader gets the next line at a time through the internal collect iterator. The collect iter will merge all rowsetreaders and return by row. The merger operations here are based on rowcursor and rowblock.So it needs to be rewrite reader first.
Now Doris has dup_key, agg_key and unique_key storage models. Dup_key does not need to do aggregation logic. So we can first transform this part that directly deliver the corresponding data to the upper layer through columnvector rather than row_cursor. At present, for agg key and unique key, it is necessary to perform data pre aggregation and other operationsby heap sort. It seems default to avoid the transformation of columns and rows.
Motivation
At present, the underlying storage in Doris is column storage.Query execution needs to be transferred to the query layer for execution by row-to-column first. Such an implementation maybe cause the performance problem。
So we want to transform the query layer of Doris to vectorized execution so that it can be not only stored by columns but is processed by vectors (parts of columns), which allows achieving high CPU efficiency. This can benefit query performance.
Here I simply implemented a POC to verify whether there is a performance improvement
Test environment:
Data set
Star Schema Benchmark
Data generation
Download the SSBM code from github and compile it. After the compilation is successful, execute the following command to generate 3000W customer data:
Build Table and Data Import Use the following statement to create a test table, and import the data customer.tbl into Doris, the data size is about 3.2GB
Environment
Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz 2 physical CPU package(s) 24 physical CPU core(s) 48 logical CPU(s) Identifier: Intel64 Family 6 Model 79 Stepping 1 ProcessorID: F1 06 04 00 FF FB EB BF Context Switches/Interrupts: 12174692729137 / 297015608902
Memory: 119.5 GiB/125.9 GiB