pingcap / tispark

TiSpark is built for running Apache Spark on top of TiDB/TiKV
Apache License 2.0
880 stars 244 forks source link

Resolve the problem of the generating of BroadcastJoin in physical plan #2712

Closed Guangggggg closed 1 year ago

Guangggggg commented 1 year ago

What problem does this PR solve?

A small table can not be generated BroadcastJoin Physical Plan.

The table size is less than default value of 'spark.sql.autoBroadcastJoinThreshold' .

And although TableSizeEstimator.estimatedTableSize < 'spark.sql.autoBroadcastJoinThreshold' , the physical plan is not still BroadcastJoin.

In some cases, this will reduce the execution efficiency of tispark, for example, when the user sets 'spark.sql.adaptive.enabled=false', the table will never be brocast

What is changed and how it works?

This PR create the 'TiDBTableScan' class to provide 'sizeInBytes' of table when generate physical plan, which can be used to determined whether to broadcast a table. And Scan class is common in other file formats such as parquet,orc and so on.

Check List

Tests

//create a table in tidb
CREATE TABLE person (id INT(11),name VARCHAR(255),birthday DATE);
INSERT INTO person VALUES(1,'tom','20170912');
INSERT INTO person VALUES(2,'tom','20170912');
INSERT INTO person VALUES(3,'tom','20170912');
INSERT INTO person VALUES(4,'tom','20170912');
INSERT INTO person VALUES(5,'tom','20170912');
INSERT INTO person VALUES(6,'tom','20170912');
INSERT INTO person VALUES(7,'tom','20170912');
INSERT INTO person VALUES(8,'tom','20170912');
INSERT INTO person VALUES(9,'tom','20170912');
SparkSession spark = SparkSession.builder().master("local[*]")
            .config("spark.sql.extensions","org.apache.spark.sql.TiExtensions")
            .config("spark.sql.catalog.tidb_catalog","org.apache.spark.sql.catalyst.catalog.TiCatalog")
            .config("spark.tispark.pd.addresses","127.0.0.1:2379")
            .config("spark.sql.catalog.tidb_catalog.pd.addresses","127.0.0.1:2379")
            .config("spark.sql.adaptive.enabled","false")
            .getOrCreate();
Dataset<Row> dataset = spark.sql("select * from tidb_catalog.test.person t1 left join tidb_catalog.test.person t2 on t1.id = t2.id");
     dataset.explain(true);
===========================result:physical plan before modification==============================

*(5) SortMergeJoin [id#6L], [id#9L], LeftOuter
:- *(2) Sort [id#6L ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(id#6L, 200), ENSURE_REQUIREMENTS, [id=#27]
:       +- *(1) ColumnarToRow
:         +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)....
+- *(4) Sort [id#9L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#9L, 200), ENSURE_REQUIREMENTS, [id=#35]
        +- *(3) ColumnarToRow
          +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)....

===========================result:physical plan after modification==============================

*(2) BroadcastHashJoin [id#6L], [id#9L], LeftOuter, BuildRight, false
:- *(2) ColumnarToRow
:    +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255).....
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#22]
     +- *(1) ColumnarToRow
        +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)...

Code changes

Side effects

Related changes

ti-chi-bot[bot] commented 1 year ago

[REVIEW NOTIFICATION]

This pull request has not been approved.

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment. After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review. Reviewer can cancel approval by submitting a request changes review.
sre-bot commented 1 year ago

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Guangggggg seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.