Open wmalpica opened 4 years ago
Design document here: https://docs.google.com/document/d/1vqMtAv8bNpw-ARTt-MPZwQQGZlSJUJb64hGspEHB8dE/edit?usp=sharing UPDATE: this design document is not really valid and a good idea, based on what has changed in the engine and what we have learned about calcite
query = """
WITH temp1 AS (
select n_nationkey, n_name from nation inner join region on n_nationkey = r_regionkey
), temp3 AS (
select n_nationkey, n_name from nation inner join region on n_nationkey = r_regionkey
), temp4 AS (
select t1.n_nationkey, t3.n_name from temp1 t1 inner join temp3 t3 on t1.n_nationkey = t3.n_nationkey
where t1.n_nationkey < 4 and t3.n_nationkey > 1
)
select * from temp4
"""
CALCITE - Optimized plan TEXT (default)
LogicalProject(n_nationkey=[$0], n_name=[$2])
LogicalJoin(condition=[=($0, $1)], joinType=[inner])
LogicalFilter(condition=[<($0, 4)])
LogicalProject(n_nationkey=[$0])
LogicalJoin(condition=[=($0, $2)], joinType=[inner])
BindableTableScan(table=[[main, nation]], projects=[[0, 1]], aliases=[[n_nationkey, n_name]])
BindableTableScan(table=[[main, region]], projects=[[0]], aliases=[[r_regionkey]])
LogicalFilter(condition=[>($0, 1)])
LogicalProject(n_nationkey=[$0], n_name=[$1])
LogicalJoin(condition=[=($0, $2)], joinType=[inner])
BindableTableScan(table=[[main, nation]], projects=[[0, 1]], aliases=[[n_nationkey, n_name]])
BindableTableScan(table=[[main, region]], projects=[[0]], aliases=[[r_regionkey]])
JSON
{ "rels": [ { "id": "0", "relOp": "com.blazingdb.calcite.interpreter.BindableTableScan", "table": [ "main", "nation" ], "projects": [ 0, 1 ], "aliases": [ "n_nationkey", "n_name" ], "inputs": [] }, { "id": "1", "relOp": "com.blazingdb.calcite.interpreter.BindableTableScan", "table": [ "main", "region" ], "projects": [ 0 ], "aliases": [ "r_regionkey" ], "inputs": [] }, { "id": "2", "relOp": "LogicalJoin", "condition": { "op": { "name": "=", "kind": "EQUALS", "syntax": "BINARY" }, "operands": [ { "input": 0, "name": "$0" }, { "input": 2, "name": "$2" } ] }, "joinType": "inner", "inputs": [ "0", "1" ] }, { "id": "3", "relOp": "LogicalProject", "fields": [ "n_nationkey" ], "exprs": [ { "input": 0, "name": "$0" } ] }, { "id": "4", "relOp": "LogicalFilter", "condition": { "op": { "name": "<", "kind": "LESS_THAN", "syntax": "BINARY" }, "operands": [ { "input": 0, "name": "$0" }, { "literal": 4, "type": { "type": "INTEGER", "nullable": false } } ] } }, { "id": "5", "relOp": "LogicalProject", "fields": [ "n_nationkey", "n_name" ], "exprs": [ { "input": 0, "name": "$0" }, { "input": 1, "name": "$1" } ], "inputs": [ "2" ] }, { "id": "6", "relOp": "LogicalFilter", "condition": { "op": { "name": ">", "kind": "GREATER_THAN", "syntax": "BINARY" }, "operands": [ { "input": 0, "name": "$0" }, { "literal": 1, "type": { "type": "INTEGER", "nullable": false } } ] } }, { "id": "7", "relOp": "LogicalJoin", "condition": { "op": { "name": "=", "kind": "EQUALS", "syntax": "BINARY" }, "operands": [ { "input": 0, "name": "$0" }, { "input": 1, "name": "$1" } ] }, "joinType": "inner", "inputs": [ "4", "6" ] }, { "id": "8", "relOp": "LogicalProject", "fields": [ "n_nationkey", "n_name" ], "exprs": [ { "input": 0, "name": "$0" }, { "input": 2, "name": "$2" } ] } ] }
Graph representations will be attached
We should use Calcite, to provide us the info we need to know which relational algebra nodes are shared. Through some experiments I have found the following:
If we modify the function getRelationalAlgebraString
in RelationalAlgebraGenerator.java
to this:
public String
getRelationalAlgebraString(String sql) throws SqlSyntaxException, SqlValidationException, RelConversionException {
String response = "";
try {
RelNode optimizedPlan = getRelationalAlgebra(sql);
response = RelOptUtil.dumpPlan("", optimizedPlan, SqlExplainFormat.TEXT, SqlExplainLevel.NON_COST_ATTRIBUTES);
}catch(SqlValidationException ex){
//System.out.println(ex.getMessage());
//System.out.println("Found validation err!");
throw ex;
//return "fail: \n " + ex.getMessage();
}catch(SqlSyntaxException ex){
//System.out.println(ex.getMessage());
//System.out.println("Found syntax err!");
throw ex;
//return "fail: \n " + ex.getMessage();
} catch(Exception ex) {
//System.out.println(ex.toString());
//System.out.println(ex.getMessage());
ex.printStackTrace();
LOGGER.error(ex.getMessage());
return "fail: \n " + ex.getMessage();
}
return response;
}
THen we can get a plan that looks like this:
LogicalProject(n_nationkey=[$0], n_name=[$2]), id = 654
LogicalJoin(condition=[=($0, $1)], joinType=[inner]), id = 652
LogicalFilter(condition=[<($0, 4)]), id = 673
LogicalProject(n_nationkey=[$0]), id = 670
LogicalProject(n_nationkey=[$0], n_name=[$1]), id = 664
LogicalJoin(condition=[=($0, $2)], joinType=[inner]), id = 662
BindableTableScan(table=[[main, nation]], projects=[[0, 1]], aliases=[[n_nationkey, n_name]]), id = 666
BindableTableScan(table=[[main, region]], projects=[[0]], aliases=[[r_regionkey]]), id = 668
LogicalProject(n_nationkey=[$0], n_name=[$1]), id = 647
LogicalFilter(condition=[>($0, 1)]), id = 640
LogicalProject(n_nationkey=[$0], n_name=[$1]), id = 664
LogicalJoin(condition=[=($0, $2)], joinType=[inner]), id = 662
BindableTableScan(table=[[main, nation]], projects=[[0, 1]], aliases=[[n_nationkey, n_name]]), id = 666
BindableTableScan(table=[[main, region]], projects=[[0]], aliases=[[r_regionkey]]), id = 668
Notice that we now have an id
and when the id numbers are the same for one plan, that means that we can reuse them. And, compared to the JSON based plan above, we would have to make very minimal changes to the relational algrbra parsing logic currently in the engine
Note that the suggestion here, comes in part from learnings from experimenting with the work started here: https://github.com/BlazingDB/blazingsql/pull/704/files Look at this experimental PR to see what imports may be needed to implement the changes to the function
Using the new information we can get from calcite, we can know which kernels should be reused. This now means that we have kernels that can output to more than one kernel. There are many ways we can do that. To help describe the options, lets assume there is a kernel 1A that is outputting to two kernels 2A and 2B.
Option 1: We can have two CacheMachines (cmA and cmB, connecting 1A to 2A and 1A to 2B respectively), and each of the two kernels just pulls from their own CacheMachine. Whenever 1A has an output, it is copied and one copy is added to cmA and the other copy to cmB. Pros: This makes the implementation fairly simple, and each CacheMachine manages its own queue and since we are making copies, we dont have to worry about managing the lifetime and state of a shared CacheData. Cons: Have to make a copy of each output of 1A. Implementation details: Right now kernels dont actually have the CacheMachines as members, they instead have an input port and an output port. Each port is the one that can have one or more CacheMachines. Right now the access to those CacheMachines is public, but if we make them private, we can control the adding to the output cacheMachine in such a way so that if there is actually more than one CacheMachine (i.e. cmA, cmB), then the port is the one that knows that and makes copies when necessary. The kernel itself does not have to worry about it.
Option 2:
We modify CacheMachine so that it can handle getting pulls from more than one kernel. It would know beforehand how many kernels can pull from it. We would have to modify the WaitingQueue so that its more like an array, and we would have a std::map<int, int> current_index
so that each kernel id can map to a index of which element in the array it is getting next. Also we would have a pull_count
for each element in the array. This way, we would know which element needs to get pulled next for each kernel its feeding and the pull_count would tell us when a pull is the last pull we expect for a particular element. For example if 2A pulls element 0, it would get a copy, but when after that 2B pulls element 0, it would get the actual element.
Pros: This defers the copy to when me make a pull instead of when we first produce the output
Cons: Still have to make a copy. Its also more complicated compared to Option 1
Implementation details: Right now kernels dont actually have the CacheMachines as members, they instead have an input port and an output port. Right now the access to those CacheMachines is public, but if we make them private, we can control how we pull from the CacheMachine and we can have the port always supply pullFromCache
function the kernel_id so that it uses the correct current_index
that tells the CacheMachine which is the kernel pulling. The kernel itself does not have to worry about it.
We should go with Option 1.
I have been thinking about the implementation details and a couple things to keep in mind.
Right now for distribution kernels, there are functions such as scatter
and broadcast
that take a CacheMachine as input.
These will need to be modified to take in a port id. So that the function internally can call the new port APIs that will put the data into the correct port and copy data into different CacheMachines, if there are more than one connected to that port.
Additionally, the logic that receives a message that that is sent to a specific cache (i.e. send_message
with specific_cache
== true) needs to also use ports.
Tasks to this: