microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
424 stars 115 forks source link

Explain output shows indexes picked but doesn't show them in "Indexes Used" section #252

Open apoorvedave1 opened 4 years ago

apoorvedave1 commented 4 years ago

Describe the issue

Index is used in the query and explain output shows modification correctly. But "Index Used" section is blank.

To Reproduce

// create data
// create index
// append data
// refresh index incremental
val query = filter1.join(filter2, "name")
hyperspace.explain(query)

Output

query: org.apache.spark.sql.DataFrame = [name: string, qty: int ... 3 more fields]
=============================================================
Plan with indexes:
=============================================================
Project [name#780, qty#781, date#782, qty#1108, date#1109]
+- SortMergeJoin [name#780], [name#1107], Inner
   :- *(1) Sort [name#780 ASC NULLS FIRST], false, 0
   <----:  +- *(1) Project [name#780, qty#781, date#782]---->
   <----:     +- *(1) Filter (isnotnull(name#780) && (name#780 = banana))---->
   <----:        +- *(1) FileScan parquet [name#780,date#782,qty#781] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://apdave@charmbenchmark.dfs.core.windows.net/synapse/workspaces/apdavew/w..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct<name:string,date:string,qty:int>, SelectedBucketsCount: 1 out of 200---->
   +- *(2) Sort [name#1107 ASC NULLS FIRST], false, 0
      <----+- *(2) Project [name#1107, qty#1108, date#1109]---->
         <----+- *(2) Filter (((isnotnull(qty#1108) && (qty#1108 > 10)) && isnotnull(name#1107)) && (name#1107 = banana))---->
            <----+- *(2) FileScan parquet [name#1107,date#1109,qty#1108] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://apdave@charmbenchmark.dfs.core.windows.net/synapse/workspaces/apdavew/w..., PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct<name:string,date:string,qty:int>, SelectedBucketsCount: 1 out of 200---->

=============================================================
Plan without indexes:
=============================================================
Project [name#780, qty#781, date#782, qty#1108, date#1109]
+- SortMergeJoin [name#780], [name#1107], Inner
   :- *(2) Sort [name#780 ASC NULLS FIRST], false, 0
   <----:  +- Exchange hashpartitioning(name#780, 200), [id=#1274]---->
   <----:     +- *(1) Project [name#780, qty#781, date#782]---->
   <----:        +- *(1) Filter (isnotnull(name#780) && (name#780 = banana))---->
   <----:           +- *(1) FileScan parquet [name#780,qty#781,date#782] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://apdave@charmbenchmark.dfs.core.windows.net/path/to/productTable], PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct<name:string,qty:int,date:string>---->
   +- *(4) Sort [name#1107 ASC NULLS FIRST], false, 0
      <----+- Exchange hashpartitioning(name#1107, 200), [id=#1280]---->
         <----+- *(3) Project [name#1107, qty#1108, date#1109]---->
            <----+- *(3) Filter (((isnotnull(qty#1108) && (qty#1108 > 10)) && isnotnull(name#1107)) && (name#1107 = banana))---->
               <----+- *(3) FileScan parquet [name#1107,qty#1108,date#1109] Batched: true, Format: Parquet, Location: InMemoryFileIndex[abfss://apdave@charmbenchmark.dfs.core.windows.net/path/to/productTable], PartitionFilters: [], PushedFilters: [IsNotNull(qty), GreaterThan(qty,10), IsNotNull(name), EqualTo(name,banana)], ReadSchema: struct<name:string,qty:int,date:string>---->

=============================================================
Indexes used:
=============================================================

Expected behavior

Environment

rohitashJain commented 4 years ago

Hi @apoorvedave1 I am new to hyperspace and keen to contribute.

I was trying to figure out the root cause. I guess, the issue can be traced from this line of code

val usedIndexes = indexes.filter(indexes("indexLocation").isin(getPaths(plan): _*))

IndexLocation is updated after the refresh index incremental.

After the update, the indexLocation path was stripped of the version at the end. It was coming out to be the indexRootPath.

Ideally, it should have been IndexRootPath/v__=1

It requires a better understanding of core knowledge. I will try to spend some more time and figure it out. Let me know if you have any pointers about it.

apoorvedave1 commented 4 years ago

Thanks @rohitashJain , that's very helpful triage. I think we can make this more robust by stripping the v__=# section completely from the hyperspace.indexes api. As you pointed out, the behavior of hyperspace.indexes is different before and after refresh. Before refresh we see v__=0 in the "indexLocation" column, after refresh we don's see v__=1 at all.

This looks like a bug. To make this consistent, one option is to fix this private def indexDirPath method call.

Problem: indexDirPath returns different outputs before and after incremental index refresh. When all index files belong to same directory, it adds v__=# to the returned value. If index files belong to different directories, it strips away the v__ part. Possible Solution: To make the behavior appropriate (and consistent across refreshes), we should always strip away the v__= part from the output.

Possible Solution: One way could be to use the spark.conf.get("spark.hyperspace.system.path") config. Use the index system path and directly find out index root path from it. For e.g.

Case 1: Index Files: /systempath/myIndex/v__=0/f1 /systempath/myIndex/v__=1/f2

Case 2: Index Files: /systempath/myIndex/v__=0/f1

Output (same irrespective of index files being in one folder or multiple folders). Return root index folder (strip away version): /systempath/myIndex/

cc @imback82

apoorvedave1 commented 4 years ago

Then we can move on to hyperspace.explain api fix:

I guess, the issue can be traced from this line of code

val usedIndexes = indexes.filter(indexes("indexLocation").isin(getPaths(plan): _*))

We can then update the getPaths(plan) to return index root paths without the v__, (and remove duplicates).

apoorvedave1 commented 4 years ago

Thanks @rohitashJain, Please let me know if I was unclear or if I can help in some way.

apoorvedave1 commented 4 years ago

@rohitashJain https://github.com/microsoft/hyperspace/issues/251 this one could be fixed first. What do you think?

rohitashJain commented 4 years ago

@apoorvedave1 Looks good to me. As you suggested, we would need to fix two pieces

  1. private def indexDirPath(entry: IndexLogEntry) - Add simple condition in while loop && ! root.subdir.head.name.startWith(INDEX_VERSION_DIRECTORY_PREFIX)
  2. Fix getPath function - Instead of location.rootPaths.head.getParent.toString use location.rootPaths.head.getParent.getParent.toString ( Extra call of getParent )

But can we fix only private def indexDirPath(entry: IndexLogEntry) - to return always the latestVersionPath always

Actually, it's about optics, In hyperspace.index dataframe index('indexLocation') column gives us the location of index. The question is should it be the indexRootDirectory or the actual versioned location of the index. Let's say we go with indexRootDirectory, should we also add another field in IndexSummary case class to include the version information for debugging. I guess the version information is sometimes, helpful in debugging and it would be good if we can retain it.

I am interested in contributing to this bug. Let me know your thoughts, I can make changes accordingly.