Open haitaoguan opened 1 year ago
ACK
T:-2.LEFT_JOIN_ON({T:-1},{T:-4,T:-5},C:1)
The problem is that tianmu executes LEFT_JOIN_ON. Changing the table order of the execution plan has little impact on the query time
+----+-------------+---------------+------------+------+---------------+------+---------+------+---------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra |
+----+-------------+---------------+------------+------+---------------+------+---------+------+---------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 1 | PRIMARY | a | NULL | ALL | NULL | NULL | NULL | NULL | 1016 | 100.00 | NULL |
| 1 | PRIMARY | <derived3> | NULL | ALL | NULL | NULL | NULL | NULL | 325120 | 100.00 | Using where; Using join buffer (Block Nested Loop) |
| 1 | PRIMARY | xx | NULL | ALL | NULL | NULL | NULL | NULL | 3251200 | 100.00 | Using where; Using join buffer (Block Nested Loop) |
| 3 | DERIVED | c1am_acct_day | NULL | ALL | NULL | NULL | NULL | NULL | 3251200 | 10.00 | Using where with pushed condition (`mbs`.`c1am_acct_day`.`DELETED_FLAG` = '0')(t0) Pckrows: 50, susp. 50 (0 empty 0 full). Conditions: 1; Using temporary; Using filesort |
+----+-------------+---------------+------------+------+---------------+------+---------+------+---------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Add the WHERE c.account_id IS NOT NULL condition so that null does not exist in the result set, thus converting the outer join to the inner join. In this way, the join condition can be pushed up and down according to the situation, so that the query takes less than 1 second
But for the slow left join, we need to find a calm point. If the problem is nested loop, it needs to be redesigned
同样是客户POC的问题, 影响客户是否使用天幕的决策,必须要尽快解决,本问题单继续对该问题做分析, 尽快处理该问题以满足客户的需求
对比该SQL的外连接,进行内连接则具有显著的查询性能提升,此因素需要被操作符的物理计划上的逐个分析
可以对比下内连接的执行计划,几乎所有的条件都被上提,而外连接却是在执行nested loop, 由于物化表没有聚集处理,导致时间复杂度暴增
我想想,外连接和内连接差别如此巨大,有点意思,代码写的有点屌
由于为客户POC拿下合同的紧迫性,在客户的现场,建议客户在自己的业务SQL中添加空值拒绝的语义以转为内连接,以在客户现场演示环节向客户展现连接查询的性能
在进行POC以拿下客户的同时,对外连接进行彻底的分析
构建的查询计划倒是符合预期,重点在于物理计划的执行
void CompiledQuery::LeftJoinOn(const TabID &temp_table, std::vector<TabID> &left_tables,
std::vector<TabID> &right_tables, const CondID &cond_id) {
CompiledQuery::CQStep s;
s.type = StepType::LEFT_JOIN_ON;
s.t1 = temp_table;
s.c1 = cond_id;
s.tables1 = left_tables;
s.tables2 = right_tables;
steps.push_back(s);
}
此处尤其要对比内连接的表结构见的关系, 与外连接不同,内连接对于表做了不同的处理
void CompiledQuery::InnerJoinOn(const TabID &temp_table, std::vector<TabID> &left_tables,
std::vector<TabID> &right_tables, const CondID &cond_id) {
CompiledQuery::CQStep s;
s.type = StepType::INNER_JOIN_ON;
s.t1 = temp_table;
s.c1 = cond_id;
s.tables1 = left_tables;
s.tables1.insert(s.tables1.end(), right_tables.begin(), right_tables.end());
steps.push_back(s);
}
这段代码写的有点水平,有点意思,虽说关系型数据库的关系不是标准的集合论,是采用的包模型,不过这块代码确实出彩,干开源的弟兄们看代码时候,可以多看看这块,有两下子
此连接操作在物理执行的时候在执行多重循环,更为麻烦的地方在于内连接不是一个连接操作,而是多个列组成的虚拟表的扫描过滤
void TempTable::AddInnerConds(Condition *cond, std::vector<TabID> &dim_aliases) {
for (uint i = 0; i < cond->Size(); i++)
for (uint j = 0; j < dim_aliases.size(); j++) (*cond)[i].left_dims[GetDimension(dim_aliases[j])] = true;
filter.AddConditions(cond, CondType::ON_INNER_FILTER);
}
void TempTable::AddLeftConds(Condition *cond, std::vector<TabID> &left_aliases, std::vector<TabID> &right_aliases) {
for (uint i = 0; i < cond->Size(); i++) {
for (int d = 0; d < (*cond)[i].left_dims.Size(); d++) (*cond)[i].left_dims[d] = (*cond)[i].right_dims[d] = false;
for (uint j = 0; j < left_aliases.size(); j++) (*cond)[i].left_dims[GetDimension(left_aliases[j])] = true;
for (uint j = 0; j < right_aliases.size(); j++) (*cond)[i].right_dims[GetDimension(right_aliases[j])] = true;
}
filter.AddConditions(cond, CondType::ON_LEFT_FILTER);
}
if (false_desc_found && outer_dims.IsEmpty()) {
all_dims.Plus(cond[0].left_dims); // for FALSE join condition
// DimensionUsed() does not mark anything
for (int i = 0; i < mind->NumOfDimensions(); i++)
if (all_dims[i])
mind->Empty(i);
return; // all done
}
可以看出当将T4表和T5表都与T2表做了内连接后,随后的条件过滤都只针对T2表进行
之所以能这么做,涉及到集合论中的语义,可以参考韦恩图中的示意
通过将join操作符转换为限制的做法,成功的避免了对join的耗时处理
但是这玩意说的简单,具体是怎么做的,以及对于外连接的语义的处理
由此成功的将两趟算法,改变为了一趟算法
join操作符的物理计划至少是个两趟的算法,此处对比innodb对于外连接的处理,很明显是由于物化表的数据非聚集
T:-2.LEFT_JOIN_ON({T:-1},{T:-4,T:-5},C:1)
#0 Tianmu::core::ParameterizedFilter::UpdateMultiIndex (this=0x7f25983959e0, count_only=false, limit=-1) at /root/work/stonedb-dev-202305026/storage/tianmu/core/parameterized_filter.cpp:1117
#1 0x0000000002cff56e in Tianmu::core::Query::Preexecute (this=0x7f28c1d507d0, qu=..., sender=0x7f25fd1212f0, display_now=true)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/query.cpp:797
#2 0x0000000002ceba5e in Tianmu::core::Engine::Execute (this=0x53dbeb0, thd=0x7f25fc000e10, lex=0x7f25fc003138, result_output=0x7f25fc006ab8, unit_for_union=0x0)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/engine_execute.cpp:513
#3 0x0000000002cea74b in Tianmu::core::Engine::HandleSelect (this=0x53dbeb0, thd=0x7f25fc000e10, lex=0x7f25fc003138, result=@0x7f28c1d50dc8: 0x7f25fc006ab8, setup_tables_done_option=0,
res=@0x7f28c1d50dc4: 0, is_optimize_after_tianmu=@0x7f28c1d50dbc: 1, tianmu_free_join=@0x7f28c1d50dc0: 1, with_insert=0)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/engine_execute.cpp:243
#4 0x0000000003084438 in Tianmu::DBHandler::ha_my_tianmu_query (thd=0x7f25fc000e10, lex=0x7f25fc003138, result_output=@0x7f28c1d50dc8: 0x7f25fc006ab8, setup_tables_done_option=0,
res=@0x7f28c1d50dc4: 0, is_optimize_after_tianmu=@0x7f28c1d50dbc: 1, tianmu_free_join=@0x7f28c1d50dc0: 1, with_insert=0)
at /root/work/stonedb-dev-202305026/storage/tianmu/sql/ha_my_tianmu.cpp:95
#5 0x0000000002427a28 in execute_sqlcom_select (thd=0x7f25fc000e10, all_tables=0x7f25fc005ee0) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:5204
#6 0x0000000002420d9e in mysql_execute_command (thd=0x7f25fc000e10, first_level=true) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:2847
#7 0x0000000002428a8d in mysql_parse (thd=0x7f25fc000e10, parser_state=0x7f28c1d51f90) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:5642
#8 0x000000000241da84 in dispatch_command (thd=0x7f25fc000e10, com_data=0x7f28c1d52730, command=COM_QUERY) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:1495
#9 0x000000000241c8c5 in do_command (thd=0x7f25fc000e10) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:1034
#10 0x000000000254ee35 in handle_connection (arg=0x9467150) at /root/work/stonedb-dev-202305026/sql/conn_handler/connection_handler_per_thread.cc:313
#11 0x0000000002c1e674 in pfs_spawn_thread (arg=0x929f420) at /root/work/stonedb-dev-202305026/storage/perfschema/pfs.cc:2197
#12 0x00007f2911a3d1ca in start_thread () from /lib64/libpthread.so.0
#13 0x00007f290e8b3e73 in clone () from /lib64/libc.so.6
尤其要小心对于join的操作的过滤处理
void ParameterizedFilter::PrepareJoiningStep(Condition &join_desc, Condition &desc, int desc_no, MultiIndex &mind_) {
// join parameters based on the first joining condition
DimensionVector dims1(mind_.NumOfDimensions());
desc[desc_no].DimensionUsed(dims1);
mind_.MarkInvolvedDimGroups(dims1);
DimensionVector cur_outer_dim(desc[desc_no].right_dims);
bool outer_present = !cur_outer_dim.IsEmpty();
// add join (two-table_) conditions first
for (uint i = desc_no; i < desc.Size(); i++) {
if (desc[i].done || desc[i].IsDelayed() || !desc[i].IsType_JoinSimple())
continue;
DimensionVector dims2(mind_.NumOfDimensions());
desc[i].DimensionUsed(dims2);
if (desc[i].right_dims == cur_outer_dim && (outer_present || dims1.Includes(dims2))) {
// can be executed together if all dimensions of the other condition are
// present in the base one or in case of outer join
join_desc.AddDescriptor(desc[i]);
desc[i].done = true; // setting in advance, as we already copied the
// descriptor to be processed
}
}
// add the rest of conditions (e.g. one-dimensional outer conditions), which
// are not "done" yet
for (uint i = desc_no; i < desc.Size(); i++) {
if (desc[i].done || desc[i].IsDelayed())
continue;
DimensionVector dims2(mind_.NumOfDimensions());
desc[i].DimensionUsed(dims2);
if (desc[i].right_dims == cur_outer_dim && (outer_present || dims1.Includes(dims2))) {
// can be executed together if all dimensions of the other condition are
// present in the base one or in case of outer join
join_desc.AddDescriptor(desc[i]);
desc[i].done = true; // setting in advance, as we already copied the descriptor to be processed
}
}
}
内连接也是使用了hash join
#0 Tianmu::core::ParallelHashJoiner::ExecuteJoinConditions (this=0x7f3084905fe0, cond=...) at /root/work/stonedb-dev-202305026/storage/tianmu/core/parallel_hash_join.cpp:236
#1 0x00000000030e76a3 in Tianmu::core::ParameterizedFilter::UpdateJoinCondition (this=0x7f30848f91a0, cond=..., tips=...)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/parameterized_filter.cpp:718
#2 0x00000000030eb532 in Tianmu::core::ParameterizedFilter::UpdateMultiIndex (this=0x7f30848f91a0, count_only=false, limit=-1)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/parameterized_filter.cpp:1444
#3 0x0000000002cff56e in Tianmu::core::Query::Preexecute (this=0x7f33962517d0, qu=..., sender=0x7f3084009ed0, display_now=true)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/query.cpp:797
#4 0x0000000002ceba5e in Tianmu::core::Engine::Execute (this=0x644aeb0, thd=0x7f3084000e10, lex=0x7f3084003138, result_output=0x7f3084006ab8, unit_for_union=0x0)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/engine_execute.cpp:513
#5 0x0000000002cea74b in Tianmu::core::Engine::HandleSelect (this=0x644aeb0, thd=0x7f3084000e10, lex=0x7f3084003138, result=@0x7f3396251dc8: 0x7f3084006ab8, setup_tables_done_option=0,
res=@0x7f3396251dc4: 0, is_optimize_after_tianmu=@0x7f3396251dbc: 1, tianmu_free_join=@0x7f3396251dc0: 1, with_insert=0)
at /root/work/stonedb-dev-202305026/storage/tianmu/core/engine_execute.cpp:243
#6 0x0000000003084438 in Tianmu::DBHandler::ha_my_tianmu_query (thd=0x7f3084000e10, lex=0x7f3084003138, result_output=@0x7f3396251dc8: 0x7f3084006ab8, setup_tables_done_option=0,
res=@0x7f3396251dc4: 0, is_optimize_after_tianmu=@0x7f3396251dbc: 1, tianmu_free_join=@0x7f3396251dc0: 1, with_insert=0)
at /root/work/stonedb-dev-202305026/storage/tianmu/sql/ha_my_tianmu.cpp:95
#7 0x0000000002427a28 in execute_sqlcom_select (thd=0x7f3084000e10, all_tables=0x7f3084005ee0) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:5204
#8 0x0000000002420d9e in mysql_execute_command (thd=0x7f3084000e10, first_level=true) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:2847
#9 0x0000000002428a8d in mysql_parse (thd=0x7f3084000e10, parser_state=0x7f3396252f90) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:5642
#10 0x000000000241da84 in dispatch_command (thd=0x7f3084000e10, com_data=0x7f3396253730, command=COM_QUERY) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:1495
#11 0x000000000241c8c5 in do_command (thd=0x7f3084000e10) at /root/work/stonedb-dev-202305026/sql/sql_parse.cc:1034
#12 0x000000000254ee35 in handle_connection (arg=0xa4d3440) at /root/work/stonedb-dev-202305026/sql/conn_handler/connection_handler_per_thread.cc:313
#13 0x0000000002c1e674 in pfs_spawn_thread (arg=0xa30b5c0) at /root/work/stonedb-dev-202305026/storage/perfschema/pfs.cc:2197
#14 0x00007f33a05751ca in start_thread () from /lib64/libpthread.so.0
#15 0x00007f339d3ebe73 in clone () from /lib64/libc.so.6
在hash join对内连接和外连接做了完全不同的处理,外连接简直就是nested loop的另一种实现
void ParallelHashJoiner::InitOuter(Condition &cond) {
DimensionVector outer_dims(cond[0].right_dims); // outer_dims will be filled with nulls for
// non-matching tuples
if (!outer_dims.IsEmpty()) {
if (traversed_dims_.Includes(outer_dims)) {
// Watch the non-outer dim for unmatched tuples and add them with nulls on
// outer dim.
watch_matched_ = true;
uint64_t origin_size = mind->NumOfTuples(matched_dims_);
for (int index = 0; index < mind->NumOfDimensions(); index++) {
if (matched_dims_[index]) {
origin_size = std::max(origin_size, mind->OrigSize(index));
}
}
outer_matched_filter_.reset(new MutexFilter(origin_size, pack_power_, true));
} else if (matched_dims_.Includes(outer_dims)) {
watch_traversed_ = true;
}
outer_nulls_only_ = true;
for (int j = 0; j < outer_dims.Size(); j++)
if (outer_dims[j] && tips.null_only[j] == false)
outer_nulls_only_ = false;
}
}
(gdb) p traversed_hash_tables_[0].outer_filter_.get()[0].filter_.get()[0]
$10 = (Tianmu::core::Filter) {
<Tianmu::mm::TraceableObject> = {
_vptr.TraceableObject = 0x4411118 <vtable for Tianmu::core::Filter+16>,
next = 0x0,
prev = 0x0,
tracker = 0x0,
static m_MemHandling = 0x8137820,
m_preUnused = false,
static globalFreeable = {
<std::__atomic_base<unsigned long>> = {
static _S_alignment = 8,
_M_i = 8
},
members of std::atomic<unsigned long>:
static is_always_lock_free = true
},
static globalUnFreeable = {
<std::__atomic_base<unsigned long>> = {
static _S_alignment = 8,
_M_i = 535880
},
members of std::atomic<unsigned long>:
static is_always_lock_free = true
},
m_sizeAllocated = 0,
owner = 0x0,
m_locking_mutex = @0x81378c0,
m_coord = {
ID = Tianmu::core::COORD_TYPE::COORD_UNKNOWN,
co = {
pack = {
<Tianmu::core::object_id_helper::empty> = {<No data fields>},
members of Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty>:
_coord = {0, 0, 0},
static ID = <optimized out>
},
rcattr = {
<Tianmu::core::object_id_helper::empty> = {<No data fields>},
members of Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)4, 2, Tianmu::core::object_id_helper::empty>:
_coord = {0, 0},
static ID = <optimized out>
}
}
},
m_lock_count = 1
},
members of Tianmu::core::Filter:
static FB_FULL = 0 '\000',
--Type <RET> for more, q to quit, c to continue without paging--
static FB_EMPTY = 1 '\001',
static FB_MIXED = 2 '\002',
static bitBlockSize = 8192,
no_blocks = 1,
block_status = 0x7f747c000fd0 "\002",
block_changed = std::vector<bool> of length 1, capacity 64 = {1},
blocks = 0x7f747c000fb0,
block_allocator = 0x7f747c001090,
no_of_bits_in_last_block = 277,
shallow = false,
block_last_one = 0x7f747c000ff0,
block_filter = 0x7f747c000e90,
delayed_stats = -1,
delayed_block = -1,
delayed_stats_set = -1,
delayed_block_set = -1,
no_power = 16,
pack_def = 65536,
bit_block_pool = 0x7f747c001010,
bit_mut = std::shared_ptr<std::mutex> (use count 1, weak count 0) = {
get() = 0x7f747c001060
}
}
(gdb) p traversed_hash_tables_[0].hash_table_.get()[0]
$17 = (Tianmu::core::HashTable) {
<Tianmu::mm::TraceableObject> = {
_vptr.TraceableObject = 0x441a428 <vtable for Tianmu::core::HashTable+16>,
next = 0x0,
prev = 0x0,
tracker = 0x0,
static m_MemHandling = 0x8137820,
m_preUnused = false,
static globalFreeable = {
<std::__atomic_base<unsigned long>> = {
static _S_alignment = 8,
_M_i = 0
},
members of std::atomic<unsigned long>:
static is_always_lock_free = true
},
static globalUnFreeable = {
<std::__atomic_base<unsigned long>> = {
static _S_alignment = 8,
_M_i = 535888
},
members of std::atomic<unsigned long>:
static is_always_lock_free = true
},
m_sizeAllocated = 4608,
owner = 0x0,
m_locking_mutex = @0x81378c0,
m_coord = {
ID = Tianmu::core::COORD_TYPE::COORD_UNKNOWN,
co = {
pack = {
<Tianmu::core::object_id_helper::empty> = {<No data fields>},
members of Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)0, 3, Tianmu::core::object_id_helper::empty>:
_coord = {0, 0, 0},
static ID = <optimized out>
},
rcattr = {
<Tianmu::core::object_id_helper::empty> = {<No data fields>},
members of Tianmu::core::ObjectId<(Tianmu::core::COORD_TYPE)4, 2, Tianmu::core::object_id_helper::empty>:
_coord = {0, 0},
static ID = <optimized out>
}
}
},
m_lock_count = 1
},
members of Tianmu::core::HashTable:
column_size_ = std::vector of length 3, capacity 4 = {1, 4, 8},
--Type <RET> for more, q to quit, c to continue without paging--
for_count_only_ = false,
column_offset_ = std::vector of length 3, capacity 3 = {0, 4, 8},
key_buf_width_ = 4,
total_width_ = 16,
multi_offset_ = 8,
rows_count_ = 277,
rows_limit_ = 249,
rows_of_occupied_ = 3,
buffer_ = 0x7f751a198000 "",
(gdb) p hash_table_finder
$46 = {
hash_table_ = 0x7f7474000b60,
key_buffer_ = 0x7f77ad6f3ef0,
matched_rows_ = 1,
current_row_ = 117,
to_be_returned_ = 1,
current_iterate_step_ = 79
}
对所有维度表中的所有表找到的行都建立个hash探测
(gdb) p vc2_[0][0]
$55 = (Tianmu::vcolumn::SingleColumn) {
<Tianmu::vcolumn::VirtualColumn> = {
<Tianmu::vcolumn::VirtualColumnBase> = {
<Tianmu::core::Column> = {
ct = {
type = Tianmu::common::ColumnType::INT,
unsigned_flag_ = false,
precision = 11,
scale = 0,
internal_size = 4,
display_size = 11,
collation = {
collation = 0x446eae0 <my_charset_latin1>,
derivation = DERIVATION_NUMERIC,
repertoire = 3
},
fmt = Tianmu::common::PackFmt::DEFAULT,
flag = std::bitset
}
},
members of Tianmu::vcolumn::VirtualColumnBase:
_vptr.VirtualColumnBase = 0x440aae0 <vtable for Tianmu::vcolumn::SingleColumn+16>,
multi_index_ = 0x7f749c8f7410,
conn_info_ = 0x7f749c8f68a0,
var_map_ = std::vector of length 1, capacity 1 = {{
var_id = {
tab = -3,
col = 0
},
col_ndx = 0,
dim = 1,
just_a_table_ptr = 0x7f77ad6f4ba0,
just_a_table = std::weak_ptr<Tianmu::core::JustATable> (use count 4, weak count 2) = {
get() = 0x7f749c8f6af0
}
}},
params_ = std::set with 0 elements,
last_val_ = std::shared_ptr<Tianmu::core::ValueOrNull> (use count 1, weak count 0) = {
get() = 0x7f749c8fd7b0
},
first_eval_ = true,
dim_ = 1,
vc_min_val_ = -9223372036854775807,
vc_max_val_ = -9223372036854775807,
vc_nulls_possible_ = true,
vc_dist_vals_ = 4,
nulls_only_ = false
},
--Type <RET> for more, q to quit, c to continue without paging--
members of Tianmu::vcolumn::VirtualColumn:
vc_pack_guard_ = {
my_vc_ = @0x7f749c8fe360,
initialized_ = false,
last_pack_ = std::vector of length 0, capacity 0,
last_pack_thread_ = std::unordered_map with 1 element = {
[140151987603200] = std::unordered_map with 1 element = {
[1] = std::unordered_map with 1 element = {
[0] = 0
}
}
},
guardian_threads_ = 1,
mx_thread_ = {
<std::__mutex_base> = {
_M_mutex = {
__data = {
__lock = 0,
__count = 0,
__owner = 0,
__nusers = 0,
__kind = 0,
__spins = 0,
__elision = 0,
__list = {
__prev = 0x0,
__next = 0x0
}
},
__size = '\000' <repeats 39 times>,
__align = 0
}
}, <No data fields>},
current_strategy_ = Tianmu::core::VCPackGuardian::GUARDIAN_LOCK_STRATEGY::LOCK_ONE_THREAD
}
},
members of Tianmu::vcolumn::SingleColumn:
col_ = 0x7f749c8f6ee0
}
(gdb) p vc2_[0][0].var_map_[0]
$63 = {
var_id = {
tab = -3,
col = 0
},
col_ndx = 0,
dim = 1,
just_a_table_ptr = 0x7f77ad6f4ba0,
just_a_table = std::weak_ptr<Tianmu::core::JustATable> (use count 4, weak count 2) = {
get() = 0x7f749c8f6af0
}
}
这两个虚拟列非常有意思,通过vc1和vc2来构建hash探测
std::vector<vcolumn::VirtualColumn *> vc1_;
std::vector<vcolumn::VirtualColumn *> vc2_;
bool ParallelHashJoiner::AddKeyColumn(vcolumn::VirtualColumn *vc, vcolumn::VirtualColumn *vc_matching) {
size_t column_index = column_bin_encoder_.size();
// Comparable, non-monotonic, non-decodable.
column_bin_encoder_.push_back(ColumnBinEncoder(ColumnBinEncoder::ENCODER_IGNORE_NULLS));
// common::CT::TIMESTAMP is omitted by ColumnValueEncoder::SecondColumn.
vcolumn::VirtualColumn *second_column =
(vc->Type().GetTypeName() == common::ColumnType::TIMESTAMP) ? nullptr : vc_matching;
bool success = column_bin_encoder_[column_index].PrepareEncoder(vc, second_column);
hash_table_key_size_.push_back(column_bin_encoder_[column_index].GetPrimarySize());
return success;
}
bool ColumnBinEncoder::PrepareEncoder(vcolumn::VirtualColumn *_vc, vcolumn::VirtualColumn *_vc2) {
if (_vc == nullptr)
return false;
bool nulls_possible = false;
if (!ignore_nulls)
nulls_possible = _vc->IsNullsPossible() || (_vc2 != nullptr && _vc2->IsNullsPossible());
vc = _vc;
ColumnType vct = vc->Type();
ColumnType vct2 = _vc2 ? _vc2->Type() : ColumnType();
bool lookup_encoder = false;
bool text_stat_encoder = false;
if (vct.IsFixed() && (!_vc2 || (vct2.IsFixed() && vct.GetScale() == vct2.GetScale()))) { // int/dec of the same scale
my_encoder.reset(new ColumnBinEncoder::EncoderInt(vc, decodable, nulls_possible, descending));
} else if (vct.IsFloat() || (_vc2 && vct2.IsFloat())) {
my_encoder.reset(new ColumnBinEncoder::EncoderDouble(vc, decodable, nulls_possible, descending));
} else if (vct.IsFixed() && !vct2.IsString()) { // Decimals for different scale (the same
// scale is done by EncoderInt)
my_encoder.reset(new ColumnBinEncoder::EncoderDecimal(vc, decodable, nulls_possible, descending));
} else if (vct.GetTypeName() == common::ColumnType::DATE) {
my_encoder.reset(new ColumnBinEncoder::EncoderDate(vc, decodable, nulls_possible, descending));
} else if (vct.GetTypeName() == common::ColumnType::YEAR) {
my_encoder.reset(new ColumnBinEncoder::EncoderYear(vc, decodable, nulls_possible, descending));
} else if (!monotonic_encoding && vct.Lookup() && _vc2 == nullptr &&
!types::RequiresUTFConversions(vc->GetCollation())) { // Lookup encoding: only non-UTF
my_encoder.reset(new ColumnBinEncoder::EncoderLookup(vc, decodable, nulls_possible, descending));
lookup_encoder = true;
} else if (!monotonic_encoding && vct.Lookup() && _vc2 != nullptr &&
vct2.Lookup()) { // Lookup in joining - may be UTF
my_encoder.reset(new ColumnBinEncoder::EncoderLookup(vc, decodable, nulls_possible, descending));
lookup_encoder = true;
} else if (vct.IsString() || vct2.IsString()) {
DTCollation col_v1 = vc->GetCollation();
DTCollation coll = col_v1;
if (_vc2) {
DTCollation col_v2 = _vc2->GetCollation();
coll = types::ResolveCollation(col_v1, col_v2);
}
if (!noncomparable) // noncomparable => non-sorted cols in sorter, don't
// UTF-encode.
my_encoder.reset(new ColumnBinEncoder::EncoderText_UTF(vc, decodable, nulls_possible, descending));
else {
my_encoder.reset(new ColumnBinEncoder::EncoderTextStat(vc, decodable, nulls_possible, descending));
if (!my_encoder->Valid()) {
if (!monotonic_encoding && !decodable && vc->MaxStringSize() > HASH_FUNCTION_BYTE_SIZE)
my_encoder.reset(new ColumnBinEncoder::EncoderTextMD5(vc, decodable, nulls_possible, descending));
else
my_encoder.reset(new ColumnBinEncoder::EncoderText(vc, decodable, nulls_possible, descending));
} else
text_stat_encoder = true;
}
} else if (vct.IsDateTime() && (!_vc2 || vct2.IsDateTime())) { // Date/time types except special cases
// (above)
my_encoder.reset(new ColumnBinEncoder::EncoderInt(vc, decodable, nulls_possible, descending));
} else {
assert(!"wrong combination of encoded columns"); // Other types not
// implemented yet
my_encoder.reset(new ColumnBinEncoder::EncoderText(vc, decodable, nulls_possible, descending));
}
if (_vc2 != nullptr) { // multiple column encoding?
bool encoding_possible = my_encoder->SecondColumn(_vc2);
if (!encoding_possible) {
bool second_try = false;
if (lookup_encoder) { // try to use text encoder instead of lookup not
// need use EncoderText_UTF
my_encoder.reset(new ColumnBinEncoder::EncoderText(vc, decodable, nulls_possible, descending));
second_try = my_encoder->SecondColumn(_vc2);
}
if (text_stat_encoder) {
if (!monotonic_encoding && !decodable && vc->MaxStringSize() > HASH_FUNCTION_BYTE_SIZE)
my_encoder.reset(new ColumnBinEncoder::EncoderTextMD5(vc, decodable, nulls_possible, descending));
else
my_encoder.reset(new ColumnBinEncoder::EncoderText(vc, decodable, nulls_possible, descending));
second_try = my_encoder->SecondColumn(_vc2);
}
if (!second_try)
return false;
}
}
val_size = my_encoder->ValueSize();
val_sec_size = my_encoder->ValueSizeSec();
return true;
}
$67 = (Tianmu::core::ColumnBinEncoder::EncoderInt) {
<Tianmu::core::ColumnBinEncoder::ColumnValueEncoder> = {
_vptr.ColumnValueEncoder = 0x4409ac0 <vtable for Tianmu::core::ColumnBinEncoder::EncoderInt+16>,
descending = false,
null_status = 0,
vc_type = {
type = Tianmu::common::ColumnType::INT,
unsigned_flag_ = false,
precision = 11,
scale = 0,
internal_size = 4,
display_size = 11,
collation = {
collation = 0x446eae0 <my_charset_latin1>,
derivation = DERIVATION_NUMERIC,
repertoire = 3
},
fmt = Tianmu::common::PackFmt::DEFAULT,
flag = std::bitset = {
[0] = 1
}
},
size = 1,
size_sec = 0
},
members of Tianmu::core::ColumnBinEncoder::EncoderInt:
min_val = 1,
max_code = 2,
min_found = 9223372036854775807,
max_found = -9223372036854775808
}
void TraversedHashTable::AssignColumnEncoder(const std::vector<ColumnBinEncoder> &column_bin_encoder) {
column_bin_encoder_.assign(column_bin_encoder.begin(), column_bin_encoder.end());
}
int64_t HashTable::Finder::GetNextRow() {
if (to_be_returned_ == 0)
return common::NULL_VALUE_64;
int64_t row_to_return = current_row_;
to_be_returned_--;
if (to_be_returned_ == 0)
return row_to_return;
// now prepare the next row to be returned
int64_t startrow = current_row_;
current_row_ += current_iterate_step_;
if (current_row_ >= hash_table_->rows_count_)
current_row_ = current_row_ % hash_table_->rows_count_;
do {
assert(*((int *)(hash_table_->buffer_ + current_row_ * hash_table_->total_width_ + hash_table_->multi_offset_)) !=
0);
if (std::memcmp(hash_table_->buffer_ + current_row_ * hash_table_->total_width_, key_buffer_->data(),
key_buffer_->size()) == 0) {
// i.e. identical row found - keep the current_row_
return row_to_return;
} else { // some other value found - iterate one step forward
current_row_ += current_iterate_step_;
if (current_row_ >= hash_table_->rows_count_)
current_row_ = current_row_ % hash_table_->rows_count_;
}
} while (current_row_ != startrow);
return row_to_return;
}
可以看出所谓的hash是一个线性的地址空间,类似于聚合的处理
int64_t HashTable::AddKeyValue(const std::string &key_buffer, bool *too_many_conflicts) {
assert(key_buffer.size() == key_buf_width_);
if (rows_of_occupied_ >= rows_limit_) // no more space
return common::NULL_VALUE_64;
unsigned int crc_code = HashValue(key_buffer.c_str(), key_buffer.size());
int64_t row = crc_code % rows_count_;
// vertical table size is not dividable by the following step values, so they
// will eventually iterate through the whole table
int64_t iterate_step = prime_steps[crc_code % prime_steps_no];
int64_t startrow = row;
do {
std::scoped_lock lk(rows_lock_[row % rows_lock_.size()]);
unsigned char *cur_t = buffer_ + row * total_width_;
int64_t *multiplier = (int64_t *)(cur_t + multi_offset_);
if (*multiplier != 0) {
if (std::memcmp(cur_t, key_buffer.c_str(), key_buf_width_) == 0) {
// i.e. identical row found
int64_t last_multiplier = *multiplier;
assert(last_multiplier > 0);
(*multiplier)++;
if (for_count_only_)
return row;
// iterate several steps forward to find some free location
row += iterate_step * last_multiplier;
if (row >= rows_count_)
row = row % rows_count_;
if (*multiplier > kMaxHashConflicts && too_many_conflicts) // a threshold for switching sides
*too_many_conflicts = true;
} else { // some other value found
// iterate one step forward
row += iterate_step;
if (row >= rows_count_)
row = row % rows_count_;
}
} else {
*multiplier = 1;
std::memcpy(cur_t, key_buffer.c_str(), key_buf_width_);
rows_of_occupied_++;
return row;
}
} while (row != startrow);
assert("Never reach here!");
return common::NULL_VALUE_64; // search deadlock (we returned to the same
// position)
}
一个比较直接的做法是将外连接分为两部分, 一部分作为内连接的结果,另一部分合并外表的结果。这样可以直接利用内连接的hash的快速性
但是为什么要抛弃常规的hash结构,而要使用连续内存的遍历查找的方式?虽然连续的内存块有利于向量化,但是当前查找的实现破坏了cpu的cache和pipeline,不过这么做的一个好处便是对于内存不会存在浪费
所以建立线性hash表的过程倒是非常直白
int64_t ParallelHashJoiner::AsyncTraverseDim(TraverseTaskParams *params) {
params->traversed_hash_table->Initialize();
HashTable *hash_table = params->traversed_hash_table->hash_table();
std::string key_input_buffer(hash_table->GetKeyBufferWidth(), 0);
MIIterator &miter(*params->task_miter->GetIter());
int traversed_rows = 0;
while (params->task_miter->IsValid()) {
if (m_conn->Killed())
break;
if (miter.PackrowStarted()) {
for (int index = 0; index < cond_hashed_; ++index) vc1_[index]->LockSourcePacks(miter);
}
// Put key value to encoding.
bool omit_this_row = false;
for (int index = 0; index < cond_hashed_; index++) {
if (vc1_[index]->IsNull(miter)) {
omit_this_row = true;
break;
}
ColumnBinEncoder *column_bin_encoder = params->traversed_hash_table->GetColumnEncoder(index);
column_bin_encoder->Encode(reinterpret_cast<unsigned char *>(key_input_buffer.data()), miter, nullptr, true);
}
if (!omit_this_row) { // else go to the next row - equality cannot be
// fulfilled
int64_t hash_row = hash_table->AddKeyValue(key_input_buffer, ¶ms->too_many_conflicts);
if (hash_row == common::NULL_VALUE_64) {
params->no_space_left = true;
break; // no space left - stop for now and then restart from the
// current row
}
if (!force_switching_sides_ && params->too_many_conflicts && !tianmu_sysvar_join_disable_switch_side)
break; // and exit the function
if (watch_traversed_)
params->traversed_hash_table->outer_filter()->Set(hash_row);
actually_traversed_rows_++;
// Put the tuple column. Note: needed also for count_only_now, because
// another conditions may need it.
if (!tips.count_only || other_cond_exist_) {
for (int index = 0; index < mind->NumOfDimensions(); ++index)
if (traversed_dims_[index])
hash_table->SetTupleValue(traversed_hash_column_[index], hash_row, miter[index]);
}
} else if (watch_traversed_) {
for (int index = 0; index < mind->NumOfDimensions(); ++index) {
if (matched_dims_[index]) {
params->build_item->SetTableValue(index, common::NULL_VALUE_64);
} else if (traversed_dims_[index]) {
params->build_item->SetTableValue(index, miter[index]);
}
}
params->build_item->CommitTableValues();
actually_traversed_rows_++;
params->outer_tuples++;
}
++miter;
traversed_rows++;
}
params->build_item->Finish();
return traversed_rows;
}
内连接的计时相对迅速
[2023-05-26 23:11:41.594371] [241238] [INFO] [parallel_hash_join.cpp:1052] MSG: AsyncMatchDim spend: 0.000557 watchmatched: false outer_nullsonly: false [2023-05-26 23:11:41.614747] [241238] [INFO] [parallel_hash_join.cpp:1052] MSG: AsyncMatchDim spend: 0.018327 watchmatched: false outer_nullsonly: false
以下在对hash做查找的过程中,对于外连接,有诸多匪夷所思的地方
while (params->task_miter->IsValid() && !interrupt_matching_) {
if (m_conn->Killed())
break;
// Rough and locking part
bool omit_this_packrow = false;
bool packrow_uniform = false; // if the packrow is uniform, process it massively
if (miter.PackrowStarted()) {
packrow_uniform = true;
for (int index = 0; index < cond_hashed_; ++index) {
if (column_bin_encoder[index].IsString()) {
if (!vc2_[index]->Type().Lookup()) { // lookup treated as string, when the
// dictionaries aren't convertible
types::BString local_min = vc2_[index]->GetMinString(miter);
types::BString local_max = vc2_[index]->GetMaxString(miter);
if (!local_min.IsNull() && !local_max.IsNull() && ImpossibleValues(index, local_min, local_max)) {
omit_this_packrow = true;
break;
}
}
packrow_uniform = false;
} else {
int64_t local_min = vc2_[index]->GetMinInt64(miter);
int64_t local_max = vc2_[index]->GetMaxInt64(miter);
if (local_min == common::NULL_VALUE_64 || local_max == common::NULL_VALUE_64 || // common::NULL_VALUE_64
// only for nulls only
ImpossibleValues(index, local_min, local_max)) {
omit_this_packrow = true;
break;
}
if (other_cond_exist_ || local_min != local_max || vc2_[index]->IsNullsPossible()) {
packrow_uniform = false;
}
}
}
packrows_matched_++;
if (packrow_uniform && !omit_this_packrow) {
for (int index = 0; index < cond_hashed_; ++index) {
int64_t local_min = vc2_[index]->GetMinInt64(miter);
column_bin_encoder[index].PutValue64(reinterpret_cast<unsigned char *>(key_input_buffer.data()), local_min,
true, false);
}
for (auto &traversed_hash_table : traversed_hash_tables_) {
HashTable *hash_table = traversed_hash_table.hash_table();
HashTable::Finder hash_table_finder(hash_table, &key_input_buffer);
int64_t matching_rows = hash_table_finder.GetMatchedRows() * miter.GetPackSizeLeft();
if (!tips.count_only)
while ((hash_row = hash_table_finder.GetNextRow()) != common::NULL_VALUE_64) {
MIIterator mit_this_pack(miter);
int64_t matching_this_pack = matching_row;
do {
SubmitJoinedTuple(params->build_item.get(), &traversed_hash_table, hash_row, mit_this_pack);
if (watch_matched_) {
outer_matched_filter_->ResetDelayed(matching_this_pack, true);
}
++mit_this_pack;
matching_this_pack++;
} while (params->task_miter->IsValid(&mit_this_pack) && !mit_this_pack.PackrowStarted());
}
else if (watch_traversed_) {
while ((hash_row = hash_table_finder.GetNextRow()) != common::NULL_VALUE_64) {
traversed_hash_table.outer_filter()->Reset(hash_row, true);
}
}
joined_tuples += matching_rows;
}
omit_this_packrow = true;
}
if (omit_this_packrow) {
matching_row += miter.GetPackSizeLeft();
miter.NextPackrow();
packrows_omitted_++;
continue; // here we are jumping out for impossible or uniform packrow
}
for (int i = 0; i < cond_hashed_; i++) vc2_[i]->LockSourcePacks(miter);
}
// Exact part - make the key row ready for comparison
bool null_found = false;
bool non_matching_sizes = false;
for (int index = 0; index < cond_hashed_; ++index) {
if (vc2_[index]->IsNull(miter)) {
null_found = true;
break;
}
column_bin_encoder[index].Encode(reinterpret_cast<unsigned char *>(key_input_buffer.data()), miter, vc2_[index]);
}
if (!null_found && !non_matching_sizes) { // else go to the next row -
// equality cannot be fulfilled
for (auto &traversed_hash_table : traversed_hash_tables_) {
HashTable *hash_table = traversed_hash_table.hash_table();
HashTable::Finder hash_table_finder(hash_table, &key_input_buffer);
int64_t matching_rows = hash_table_finder.GetMatchedRows();
// Find all matching rows
if (!other_cond_exist_) {
// Basic case - just equalities
if (!tips.count_only)
while ((hash_row = hash_table_finder.GetNextRow()) != common::NULL_VALUE_64)
SubmitJoinedTuple(params->build_item.get(), &traversed_hash_table, hash_row, miter);
else if (watch_traversed_) {
while ((hash_row = hash_table_finder.GetNextRow()) != common::NULL_VALUE_64) {
traversed_hash_table.outer_filter()->Reset(hash_row, true);
}
}
if (watch_matched_ && matching_rows > 0) {
outer_matched_filter_->ResetDelayed(matching_row, true);
}
joined_tuples += matching_rows;
} else {
// Complex case: different types of join conditions mixed together
combined_mit.Combine(miter);
while ((hash_row = hash_table_finder.GetNextRow()) != common::NULL_VALUE_64) {
bool other_cond_true = true;
for (int i = 0; i < mind->NumOfDimensions(); i++) {
if (traversed_dims_[i])
combined_mit.Set(i, hash_table->GetTupleValue(traversed_hash_column_[i], hash_row));
}
for (auto &j : other_cond_) {
j.LockSourcePacks(combined_mit);
if (j.CheckCondition(combined_mit) == false) {
other_cond_true = false;
break;
}
}
if (other_cond_true) {
if (!tips.count_only)
SubmitJoinedTuple(params->build_item.get(), &traversed_hash_table, hash_row,
miter); // use the multiindex iterator position
else if (watch_traversed_) {
traversed_hash_table.outer_filter()->Reset(hash_row, true);
}
joined_tuples++;
if (watch_matched_) {
outer_matched_filter_->ResetDelayed(matching_row, true);
}
}
}
}
}
if (!outer_nulls_only_) {
if (tips.limit != -1 && tips.limit <= joined_tuples) {
interrupt_matching_ = true;
break;
}
}
}
++miter;
matching_row++;
}
对于遍历查找做下耗时统计就非常明显,遍历 3303219200 行,耗时 297.124878 秒
[2023-05-27 00:03:18.640459] [261740] [INFO] [parallel_hash_join.cpp:1049] MSG: AsyncMatchDim traversed spend: 297.124878 matching_row: 3303219200
[2023-05-27 00:03:18.640525] [261740] [INFO] [parallel_hash_join.cpp:1064] MSG: AsyncMatchDim spend: 297.125000 watch_matched_: false outer_nulls_only_: false other_cond_exist_: true
但是内连接仅遍历 3251200 行,真是个奇葩玩意