Open andrei-ionescu opened 3 years ago
Looks good! Could you also consider
df.filter(df("nested.nst.field1") === "wa1").select("id", "name", "nested.nst.fieldId1")
?
We might need to replace all "nested#102.nst.field1" in upper level of the tree plan, not just in (Project => Filter => Relation).
It would be good if you could check replacing attributes for all plan nodes will work with some testing code.
+1 as well. Could you capture that nested fields can also be supported for included columns?
+1 Looks awesome! Thanks @andrei-ionescu!
I have a few minor questions/nits:
hs.createIndex(
df,
IndexConfig(
"idx_nested",
indexedColumns = Seq("nested.nst.field1"),
includedColumns = Seq("id", "name", "nested.nst.field1")
)
)
Did you mean nested.nst.field2
in includedColumns
?
hs.createIndex(df, IndexConfig("idx_nested", indexedColumns = Seq("nested.nst.field1"), includedColumns = Seq("id", "name")))
should have nested.nst.field2
in the included columns (and the appropriate name change reflected in the example righ below).
Join queries with hybrid scans do get even more complex.
, could you add more details (similar to how you did for Hybrid scan) for completeness?At first, it was intentional to project the field that we search by (the indexed field). But I switch to your suggestion as it seems more used.
I'll add details for joins too, as I get more understanding how the plans do get composed.
Sounds good, thank you!
@rapoth, @imback82, @sezruby: I added a few more plans for hybrid scans - one for the append files and one for deleted files. Let me know if there is something off with them.
@rapoth, @imback82, @sezruby: There is an improvements that I need to address. The fromDifferentBaseRelations
method inside the ensureAttributeRequirements
needs to be modified.
Given the nested dataset
root
|-- Date: string (nullable = true)
|-- RGUID: string (nullable = true)
|-- Query: string (nullable = true)
|-- imprs: integer (nullable = true)
|-- clicks: integer (nullable = true)
|-- nested: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- leaf: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- cnt: integer (nullable = true)
and this join
val query2 = df1.filter(df1("nested.leaf.cnt") >= 20).select(df1("nested.leaf.cnt"), df1("query"))
val query = df2.filter(df2("nested.leaf.cnt") <= 40).select(df2("nested.leaf.cnt"), df2("Date"))
query2.join(query, "cnt")
resulting in the following join plan over nested fields
Join Inner, (cnt#556 = cnt#560)
:- Project [nested#536.leaf.cnt AS cnt#556, query#533]
: +- Filter ((isnotnull(nested#536) && (nested#536.leaf.cnt >= 20)) && (nested#536.leaf.cnt <= 40))
: +- Relation[Date#531,RGUID#532,Query#533,imprs#534,clicks#535,nested#536] parquet
+- Project [nested#548.leaf.cnt AS cnt#560, Date#543]
+- Filter ((isnotnull(nested#548) && (nested#548.leaf.cnt <= 40)) && (nested#548.leaf.cnt >= 20))
+- Relation[Date#543,RGUID#544,Query#545,imprs#546,clicks#547,nested#548] parquet
the fromDifferentBaseRelations
method will return false thus the join will not meet the criteria to qualify for plan rewrite. This is because of the fact that the fields do have different ids (nested#536.leaf.cnt AS cnt#556
vs nested#548.leaf.cnt AS cnt#560
) and the match is not executed on names but on ids.
Canonicalised fields:
none#556
none#560
List(none#531, none#532, none#533, none#534, none#535, none#536)
List(none#543, none#544, none#545, none#546, none#547, none#548)
Do you see any problems if I modify the fromDifferentBaseRelations
to check fields by names instead of ids taking into account the case sensitivity setting?
@andrei-ionescu I think the problem is that canonicalized value of condition left is none#556
, not none#536
Could you try to fix canonicalization of the condition expr properly (if possible), instead of using name?
@sezruby I don't know if that is possible without other changes. The ensureAttributeRequirements(l: FileBasedRelation, r: FileBasedRelation, condition: Expression): Boolean
works with FileBaseRelation
and the join condition Expression
. If you look into the plan above, the condition uses the aliases (cnt#556 = cnt#560
) that come from projection (ie: nested#536.leaf.cnt AS cnt#556
) but the FileBasedRelation
does not have any idea how to connect #556
to #536
because the projection step/layer is not taken into account.
I just want to mention that the plan listed above is what Spark does by itself - Spark is adding those aliases in this case of joins.
My approach was to not inject the projection into the ensureAttributeRequirements
but to check requirements based on the field names.
Your suggestion is to keep the canonicalisation but to do that we need to go through projection layers to build the lineage from the join condition to relation fields. I'll try adding a function transforming the canonicalized condition fields into relation fields and use it inside fromDifferentBaseRelations
method.
The next step will be modifying the getBestIndexPair
which makes use of field names.
Based on the field name might cause some confusion/problem if there are same column names on both left and right. So I think we should keep trying use the id.
@rapoth, @imback82, @sezruby
Kindly, please have a look into it.
Problem description
This design proposal is for adding support for nested fields in both
indexedColumns
andincludedColumns
.Currently, Hyperspace does not support nested fields like structs or structured arrays.
Having the possibility to use nested fields with Hyperspace would be of great benefit to everybody.
Goal
The user should be able to create and index with nested fields and use it in queries.
For example:
Proposed solution
Using the same Hyperspace APIs indexes over nested fields should work as for other simple fields.
Given the
nestedDataset
dataset with schemaand the following data
This is the way and index on
field1
field with other 2 projection fields can be createdAdding the support for nested fields impacts the following areas:
Creating the index
Given the dataset defined above with the listed data, after doing
the following dataset will be created
It is important to understand that the name of the field of the index column is a non-nested column and due to parquet quirkiness on using
.
(dot) in the field name, it has to be properly renamed and at query time projected as it was.Search query
Given the following search/filter query
The optimized and spark plans without index are
The transformed optimized and spark plans should look like
Complexities
Transforming the plan
Filters inside the plan must be modified to accomodate the index schema not the data schema - the flattened schema not the nested field. Instead of accessing the field with
GetStructField(GetStructField(AttributeReference))
it must directly access withAttributeReference
.Given the query plan
The filter must be modified from
to
The projection from
to
The relation from
to
Hybrid scans
The transforment plans for Hybrid scans will have need to accomodate the union between the latest files arriving in the dataset having the following schema
and the index schema
Search queries
Files Appended
The optimized plan
The Spark plan
Files Deleted
The optimized plan
The Spark plan
Join queries
The following join queries will have a dataset a bit different from the one at the beginning. The following are extracted from the
HybridScanForNestedFieldsTest
tests.Join append only
Original plan
Altered optimized plan
Altered Spark plan
Delete files
Original plan
Altered optimized plan
Altered Spark plan
Append + Delete
Original plan
Altered optimized plan
Altere Spark plan
Impact
There should be no impact on the current Hyperspace implementations.
Background
This is a result of #312 discussion.