microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
424 stars 115 forks source link

Support joins over nested fields with indexes on nested fields #381

Open andrei-ionescu opened 3 years ago

andrei-ionescu commented 3 years ago

What is the context for this pull request?

What changes were proposed in this pull request?

This PR adds support for hybrid scans when filtering over nested fields using indexes created over nested fields.

Given the nestedDataset dataset with schema

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- nested: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- nst: struct (nullable = true)
 |    |    |-- field1: string (nullable = true)
 |    |    |-- field2: string (nullable = true)

and the following data

+---+-----+-----------------+
|id |name |nested           |
+---+-----+-----------------+
|2  |name2|[va2, [wa2, wb2]]|
|1  |name1|[va1, [wa1, wb1]]|
+---+-----+-----------------+

The transformed plans for hybrid scans will have need to accommodate the union between the latest files arriving in the dataset having the schema above and the index schema bellow

root
 |-- __hs_nested.nested.nst.field1: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- __hs_nested.nested.nst.field2: string (nullable = true)

Files Appended

The optimized plan

Union
:- Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
:  +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
:     +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
:        Hyperspace(Type: CI, Name: idx_nested, LogVersion: 1)
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- Relation[id#1,name#2,nested#102] parquet

The Spark plan

Union
:- Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
:  +- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
:     +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
:        Batched: false, Format: Parquet, 
:        Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
:        PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
:        struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>
+- Project [id#100, name#101, nested#102.nst.field2]
   +- Filter (isnotnull(nested#102) && (nested#102.nst.field1 = wa1))
      +- FileScan parquet [id#100,name#101,nested#102] 
         Batched: false, Format: Parquet, 
         Location: InMemoryFileIndex[file:/..../tableN2/appended_file.parquet], 
         PartitionFilters: [], PushedFilters: [IsNotNull(nested)], ReadSchema: 
         struct<id:int,name:string,nested:struct<field1:string,nst:struct<field1:string,field2:string>>>

Files Deleted

The optimized plan

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Project [__hs_nested.nested.nst.field1#0, id#1, name#2, __hs_nested.nested.nst.field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- Relation[__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3,_data_file_id#368L] 
            Hyperspace(Type: CI, Name: indexWithLineage, LogVersion: 1)

The Spark plan

Project [id#1, name#2, __hs_nested.nested.nst.field2#3]
+- Filter (isnotnull(__hs_nested.nested.nst.field1#0) && (__hs_nested.nested.nst.field1#0 = wa1))
   +- Project [__hs_nested.nested.nst.field1#0, id#1, name#2, __hs_nested.nested.nst.field2#3]
      +- Filter NOT (_data_file_id#368L = 3)
         +- FileScan parquet [__hs_nested.nested.nst.field1#0,id#1,name#2,__hs_nested.nested.nst.field2#3] 
            Batched: false, Format: Parquet, 
            Location: InMemoryFileIndex[file:/..../spark_warehouse/indexes/idx_nested/v__=0], 
            PartitionFilters: [], PushedFilters: [IsNotNull(__hs_nested.nested.nst.field1)], ReadSchema: 
            struct<__hs_nested.nested.nst.field1:string,id:int,name:string,__hs_nested.nested.nst.field2:string>

Does this PR introduce any user-facing change?

No.

How was this patch tested?

andrei-ionescu commented 3 years ago

@sezruby, @imback82 Here is the 3rd PR that completes the support of nested fields in filters.