Closed Guangggggg closed 1 year ago
[REVIEW NOTIFICATION]
This pull request has not been approved.
To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer
in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer
in the comment to help you merge this pull request.
The full list of commands accepted by this bot can be found here.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
What problem does this PR solve?
A small table can not be broadcast when performing a join. The table size is less than 'spark.sql.autoBroadcastJoinThreshold' and 'spark.sql.adaptive.enabled' is false. Although TableSizeEstimator.estimatedTableSize < 'spark.sql.autoBroadcastJoinThreshold' , the table is not still broadcast.
What is changed and how it works?
This PR create the 'TiDBTableScan' class to provide 'sizeInBytes' of table when generate physical plan, which can be used to determined whether to broadcast a table. And Scan class is common in other file formats such as parquet,orc and so on.
Check List
Tests
====================================step1:create_table=====================================
CREATE TABLE person (id INT(11),name VARCHAR(255),birthday DATE); INSERT INTO person VALUES(1,'tom','20170912'); INSERT INTO person VALUES(2,'tom','20170912'); INSERT INTO person VALUES(3,'tom','20170912'); INSERT INTO person VALUES(4,'tom','20170912'); INSERT INTO person VALUES(5,'tom','20170912'); INSERT INTO person VALUES(6,'tom','20170912'); INSERT INTO person VALUES(7,'tom','20170912'); INSERT INTO person VALUES(8,'tom','20170912'); INSERT INTO person VALUES(9,'tom','20170912');
=====================================step2:script=========================================
===========================result:physical plan before modification==============================
(5) SortMergeJoin [id#6L], [id#9L], LeftOuter :- (2) Sort [id#6L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#6L, 200), ENSURE_REQUIREMENTS, [id=#27] : +- (1) ColumnarToRow : +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255).... +- (4) Sort [id#9L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#9L, 200), ENSURE_REQUIREMENTS, [id=#35] +- *(3) ColumnarToRow +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)....
===========================result:physical plan after modification==============================
(2) BroadcastHashJoin [id#6L], [id#9L], LeftOuter, BuildRight, false :- (2) ColumnarToRow : +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)..... +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#22] +- *(1) ColumnarToRow +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)....