apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.52k stars 1.29k forks source link

[Spool] Actual implementation #14507

Open gortiz opened 2 days ago

gortiz commented 2 days ago

This PR is a continuation of https://github.com/apache/pinot/issues/14495 and the next step on https://github.com/apache/pinot/issues/14196.

I'm opening this as a draft. This PR is not complete right now because it is not automatically tested, but it may be useful to publish the draft early to discuss whether https://github.com/apache/pinot/issues/14495 should be merged or whether it would be better to just close that (small) PR and directly merge this one once it is finished.

As an example of how it works, you can run the following in ColocatedJoinEngineQuickStart

SET useSpools = true;
explain implementation plan for
select * from userAttributes as a1
join userGroups as a2
on a1.userUUID = a2.userUUID
join userAttributes as a3
on a1.userUUID = a3.userUUID
limit 10

which returns:

[0]@192.168.1.42:46155|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)
├── [1]@192.168.1.42:44265|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]} (Subtree Omitted)
├── [1]@192.168.1.42:34477|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]} (Subtree Omitted)
├── [1]@192.168.1.42:44185|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]} (Subtree Omitted)
└── [1]@192.168.1.42:34303|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46155|[0]}
    └── [1]@192.168.1.42:34303|[3] SORT LIMIT 10
        └── [1]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
            ├── [2]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]} (Subtree Omitted)
            ├── [2]@192.168.1.42:34477|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]} (Subtree Omitted)
            ├── [2]@192.168.1.42:44185|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]} (Subtree Omitted)
            └── [2]@192.168.1.42:34303|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:37717|[3],[1]@192.168.1.42:43409|[1],[1]@192.168.1.42:43851|[2],[1]@192.168.1.42:46503|[0]}
                └── [2]@192.168.1.42:34303|[3] SORT LIMIT 10
                    └── [2]@192.168.1.42:34303|[3] JOIN
                        ├── [2]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
                        │   ├── [3]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:46503|[0]} (Subtree Omitted)
                        │   ├── [3]@192.168.1.42:34477|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:43409|[1]} (Subtree Omitted)
                        │   ├── [3]@192.168.1.42:44185|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:43851|[2]} (Subtree Omitted)
                        │   └── [3]@192.168.1.42:34303|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[2]@192.168.1.42:37717|[3]}
                        │       └── [3]@192.168.1.42:34303|[3] JOIN
                        │           ├── [3]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
spool here ->           │           │   ├── [4]@192.168.1.42:44185|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]} (Subtree Omitted)
                        │           │   └── [4]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]}
                        │           │       └── [4]@192.168.1.42:44265|[0] PROJECT
                        │           │           └── [4]@192.168.1.42:44265|[0] TABLE SCAN (userAttributes) null
                        │           └── [3]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
                        │               ├── [5]@192.168.1.42:44185|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]} (Subtree Omitted)
                        │               └── [5]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]}
                        │                   └── [5]@192.168.1.42:44265|[0] PROJECT
                        │                       └── [5]@192.168.1.42:44265|[0] TABLE SCAN (userGroups) null
                        └── [2]@192.168.1.42:34303|[3] MAIL_RECEIVE(HASH_DISTRIBUTED)
spool here ->               ├── [4]@192.168.1.42:44185|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]} (Subtree Omitted)
                            └── [4]@192.168.1.42:44265|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[2]@192.168.1.42:37717|[3],[2]@192.168.1.42:43409|[1],[2]@192.168.1.42:43851|[2],[2]@192.168.1.42:46503|[0],[3]@192.168.1.42:37717|[3],[3]@192.168.1.42:43409|[1],[3]@192.168.1.42:43851|[2],[3]@192.168.1.42:46503|[0]}
                                └── [4]@192.168.1.42:44265|[0] PROJECT
                                    └── [4]@192.168.1.42:44265|[0] TABLE SCAN (userAttributes) null

TODO:

codecov-commenter commented 2 days ago

Codecov Report

Attention: Patch coverage is 2.96296% with 262 lines in your changes missing coverage. Please review.

Project coverage is 34.55%. Comparing base (59551e4) to head (f06138a). Report is 1371 commits behind head on master.

Files with missing lines Patch % Lines
...ery/planner/physical/MailboxAssignmentVisitor.java 0.00% 64 Missing :warning:
.../pinot/query/planner/plannode/MailboxSendNode.java 0.00% 34 Missing :warning:
.../pinot/query/planner/plannode/PlanNodeVisitor.java 0.00% 27 Missing :warning:
...uery/planner/logical/EquivalentStagesReplacer.java 0.00% 19 Missing :warning:
...hysical/colocated/GreedyShuffleRewriteVisitor.java 0.00% 19 Missing :warning:
...ot/query/runtime/operator/MailboxSendOperator.java 0.00% 16 Missing :warning:
...ry/planner/explain/PhysicalExplainPlanVisitor.java 0.00% 13 Missing :warning:
...query/runtime/operator/exchange/BlockExchange.java 0.00% 12 Missing :warning:
...va/org/apache/pinot/query/runtime/QueryRunner.java 0.00% 9 Missing :warning:
.../java/org/apache/pinot/query/QueryEnvironment.java 42.85% 8 Missing :warning:
... and 11 more

:exclamation: There is a different number of reports uploaded between BASE (59551e4) and HEAD (f06138a). Click for more details.

HEAD has 6 uploads less than BASE | Flag | BASE (59551e4) | HEAD (f06138a) | |------|------|------| |skip-bytebuffers-false|7|6| |unittests|5|3| |unittests1|2|0| |java-11|5|4|
Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #14507 +/- ## ============================================= - Coverage 61.75% 34.55% -27.20% - Complexity 207 779 +572 ============================================= Files 2436 2674 +238 Lines 133233 146824 +13591 Branches 20636 22513 +1877 ============================================= - Hits 82274 50742 -31532 - Misses 44911 91979 +47068 + Partials 6048 4103 -1945 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `100.00% <ø> (+99.99%)` | :arrow_up: | | [integration](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `100.00% <ø> (+99.99%)` | :arrow_up: | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `100.00% <ø> (+99.99%)` | :arrow_up: | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <ø> (ø)` | | | [java-11](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.54% <2.96%> (-27.17%)` | :arrow_down: | | [java-21](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.54% <2.96%> (-27.08%)` | :arrow_down: | | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.55% <2.96%> (-27.19%)` | :arrow_down: | | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.54% <2.96%> (+6.81%)` | :arrow_up: | | [temurin](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.55% <2.96%> (-27.20%)` | :arrow_down: | | [unittests](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.55% <2.96%> (-27.20%)` | :arrow_down: | | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/14507/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `34.55% <2.96%> (+6.82%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.


🚨 Try these New Features: