pingcap / tispark

TiSpark is built for running Apache Spark on top of TiDB/TiKV
Apache License 2.0
880 stars 244 forks source link

Resolve the problem of the generating of BroadcastJoin in physical plan #2713

Closed Guangggggg closed 1 year ago

Guangggggg commented 1 year ago

What problem does this PR solve?

A small table can not be generated BroadcastJoin Physical Plan.

The table size is less than default value of 'spark.sql.autoBroadcastJoinThreshold' .

And although TableSizeEstimator.estimatedTableSize < 'spark.sql.autoBroadcastJoinThreshold' , the physical plan is not still BroadcastJoin.

In some cases, this will reduce the execution efficiency of tispark, for example, when the user sets 'spark.sql.adaptive.enabled=false', the table will never be brocast

What is changed and how it works?

This PR create the 'TiDBTableScan' class to provide 'sizeInBytes' of table when generate physical plan, which can be used to determined whether to broadcast a table. And Scan class is common in other file formats such as parquet,orc and so on.

Check List

Tests

//create a table in tidb
CREATE TABLE person (id INT(11),name VARCHAR(255),birthday DATE);
INSERT INTO person VALUES(1,'tom','20170912');
INSERT INTO person VALUES(2,'tom','20170912');
INSERT INTO person VALUES(3,'tom','20170912');
INSERT INTO person VALUES(4,'tom','20170912');
INSERT INTO person VALUES(5,'tom','20170912');
INSERT INTO person VALUES(6,'tom','20170912');
INSERT INTO person VALUES(7,'tom','20170912');
INSERT INTO person VALUES(8,'tom','20170912');
INSERT INTO person VALUES(9,'tom','20170912');
SparkSession spark = SparkSession.builder().master("local[*]")
            .config("spark.sql.extensions","org.apache.spark.sql.TiExtensions")
            .config("spark.sql.catalog.tidb_catalog","org.apache.spark.sql.catalyst.catalog.TiCatalog")
            .config("spark.tispark.pd.addresses","127.0.0.1:2379")
            .config("spark.sql.catalog.tidb_catalog.pd.addresses","127.0.0.1:2379")
            .getOrCreate();
Dataset<Row> dataset = spark.sql("select * from tidb_catalog.test.person t1 left join tidb_catalog.test.person t2 on t1.id = t2.id");
     dataset.explain(true);
===========================result:physical plan before modification==============================

*(5) SortMergeJoin [id#6L], [id#9L], LeftOuter
:- *(2) Sort [id#6L ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(id#6L, 200), ENSURE_REQUIREMENTS, [id=#27]
:       +- *(1) ColumnarToRow
:         +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)....
+- *(4) Sort [id#9L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#9L, 200), ENSURE_REQUIREMENTS, [id=#35]
        +- *(3) ColumnarToRow
          +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)....

===========================result:physical plan after modification==============================

*(2) BroadcastHashJoin [id#6L], [id#9L], LeftOuter, BuildRight, false
:- *(2) ColumnarToRow
:    +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255).....
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#22]
     +- *(1) ColumnarToRow
        +- TiKV CoprocessorRDD{[table: person] TableReader, Columns: id@LONG, name@VARCHAR(255)...

Code changes

Side effects

Related changes

ti-chi-bot[bot] commented 1 year ago

[REVIEW NOTIFICATION]

This pull request has been approved by:

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment. After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review. Reviewer can cancel approval by submitting a request changes review.
sre-bot commented 1 year ago

CLA assistant check
All committers have signed the CLA.

shiyuhang0 commented 1 year ago

thank you for your contribute Please

  1. fix the fmt by mvn mvn-scalafmt_2.12:format -Dscalafmt.skip=false
  2. add license for the new added file. you can copy one from other files. remember to change the time to 2013
shiyuhang0 commented 1 year ago

LGTM~

shiyuhang0 commented 1 year ago

/run-all-tests

Guangggggg commented 1 year ago

LGTM~

There is one pending checks. what do i need to do to merge?

shiyuhang0 commented 1 year ago

@zhangyangyu @xuanyu66 PTAL

ti-chi-bot[bot] commented 1 year ago

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: shiyuhang0, xuanyu66

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files: - ~~[OWNERS](https://github.com/pingcap/tispark/blob/master/OWNERS)~~ [shiyuhang0,xuanyu66] Approvers can indicate their approval by writing `/approve` in a comment Approvers can cancel approval by writing `/approve cancel` in a comment
ti-chi-bot[bot] commented 1 year ago

[LGTM Timeline notifier]

Timeline: