microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
423 stars 115 forks source link

[PROPOSAL]: Data Skipping Indexes #441

Open clee704 opened 3 years ago

clee704 commented 3 years ago

Problem Statement

Add support for data skipping indexes.

Background and Motivation

Hyperspace has been supporting hash-partitioned covering indexes only. Covering indexes are good for some workloads, but we should have more index types to facilitate a broader range of use cases.

Proposed Solution

Refactor existing code by moving covering index specific code into a single implementation for a new Index API and implementing data skipping indexes for the API.

Alternatives

The proposed solution requires heavy refactoring, as there is a lot of code specific to covering indexes scattered throughout the codebase. Alternatively, we can make DataSkippingIndex without introducing a new API and write many if-elses depending on the index type. It might be quick to implement the required features for now in this way, but it will make the code hard to maintain in the long run.

Known/Potential Compatibility Issues

Regarding data compatibility, it is very likely that this will make the new version of Hyperspace incompatible with indexes created in the older versions. This is unfortunate but can be excused as Hyperspace is still in version zero. If a strong need arises, this can be mitigated by a migration utility that can be designed separately.

The API compatibility can be retained if we introduce new optional arguments to IndexConfig. However, includedColumns doesn't make sense for data skipping indexes, and newly added arguments won't for covering indexes. Therefore, it might be wise to break compatibility now and make things clean for the future.

Design

Data Skipping Index Basics

Data skipping indexes can filter out certain files that cannot have the desired values in indexed columns based on statistics or sketches on the values in files. Examples of statistics or sketches include min/max values and bloom filters.

Suppose that we have sample data like below. There is a relation df which has two columns, a and b, and consists of two files, /data/p0 and /data/p1:

/data/p0: a b
1 2
2 4
6 6
/data/p1: a b
5 10
10 10

A min-max data skipping index on a column a will look like this:

fileid min max
0 1 6
1 5 10

The file ID column is used to map files to unique identifiers. Identifiers are created based on the file path, size, and modification time.

Then, when a query like df.filter($"a" < 4) is given, we know that we can skip the /data/p1 file.

Note that the real implementation will contain file size and modified time columns for files, to distinguish files with the same name but different sizes/modified times to support deleted/added source files.

Storing Index Data

The index data can be stored as parquet files like it is done for covering indexes. The difference here is that we don't need bucketing. Also, the data size is a lot less because there is only a single row for each file.

To speed up range queries like the last example, it might be necessary that the data is cached in the main memory as sorted sequences ordered by min and max. This should be cheap to compute as the data size is expected to be small.

If there are too many files, we might need to consider structures like B-tree to access relevant files only. But this might not be a real issue; the index data should be orders of magnitude smaller than the source data.

Index Application

Data skipping indexes can be applied in three steps:

  1. Extract constraints about key columns.
  2. Filter out irrelevant files with the constraints and index data.
  3. Rewrite the logical relations with the remaining files.

Unlike covering indexes, different data skipping indexes can be applied for the same relation multiple times.

Examples of constraints include a < 4, a = 10, or a in (1, 2, 3). Combined with statistics or sketches like min/max values and/or bloom filters, this information can be used to weed out irrelevant files.

Theoretically, applying data skipping indexes for joins is possible. Still, the implementation will be complex, and benefits are unclear over the traditional broadcast hash join, especially when the indexed columns have a low correlation with the partitioning columns. Therefore, we will focus on filters for now.

Hybrid scan is trivial and requires no special handling for data skipping indexes, as we only filter out irrelevant files and the remaining files are not touched.

The estimated cost/benefit of applying data skipping indexes - TODO

Index Maintenance

Given a dataframe (a single relation consisting of files), an index name, a column/columns to index, a statistics/sketch type (min/max, bloom filter, etc.), we can create a data skipping index by either scanning every row in the files and building the specified statistics or looking up existing statistics in the files.

Incremental refresh is possible, and in fact, is not much different from a full refresh. During refresh, we drop rows for deleted files from the index data and add rows for added files. If there are no deleted files, incremental refresh can store new rows in new files, not touching old files. Full refresh can then merge such files later.

Implementation

PRs

TODO

andrei-ionescu commented 3 years ago

@clee704, @imback82, @sezruby, @rapoth: There is another OSS data skipping index called XSkipper which has been developed by IBM. Maybe we can just plug it in in Hyperspace.

imback82 commented 3 years ago

Thanks @andrei-ionescu for the info. I just took a quick look at it and it doesn't seem straightforward to plug it in (@clee704 / @sezruby please correct me if I am wrong here), and since the project is fairly new, I am not sure if it's a good idea to have a dependency on it at this stage. Also, the index maintainence scenario seems different (no support for concurrent updates, etc.) But we will revisit when the project becomes mature.

This proposal is a scoped down version of #335 (and adding few more variations) to support filter queries first.

sezruby commented 3 years ago

@andrei-ionescu I've skimmed overall XSkipper codebase and I don't think we could easily plug it in. We are planning to support similar features - BF, (min/max), partition elimination index (similar to valueList). FYI @clee704 is working on this data skipping feature now.

clee704 commented 3 years ago

Implementation added as #443.

andrei-ionescu commented 3 years ago

This a good direction to work on. Skipping indexes are very helpful in the context of very big datasets: terabytes of data spanning across millions of files.

The start of the proposal is good but it needs more information and we need to have at least some parial answers to the following questions:

Some more information would be very helpful on how the plans will look. Can you provide some snippets of such plans altered to make use of skip indexes?

andrei-ionescu commented 3 years ago

@clee704, @sezruby Could you go through this proposal again and add more information in regards to the raised concerns above? Thanks!

clee704 commented 3 years ago
  • Will the index be bucketed?

No, because the index data is expected to be very small compared to the source data and a full scan of index data (or using a cached one) should be fine. For example, suppose we have 100 TB of data, stored in files, each file being 1 GB. It means there are 100,000 files and our index data has 100,000 rows. It might seem large, but actually it's not compared to the 100 TB of data.

If a full scan turns out to be too slow, then we might consider adding indexes to the index data. For example, we can index min/max sketches to speed up range queries.

  • What's the order inside the index? Is it ordered by the file names or by the min/max values? Having the index ordered and bucketed by the min/max values will reduce the query time. Having the index ordered by file names will help at update time. Can we have both?

There is no order, and having the index data ordered by file names won't help much because we don't support mutable files. All we support is deleted files and newly added files.

For incremental refreshes, we just ignore deleted files and add new index data files for newly appended files.

If deletion is a frequent operation, then we might consider partitioning the index data in the same way as the source data for a faster deletion of index data when an entire partition of source data is deleted, which seems to be a common operation in the field.

For ordering by min/max values, please refer to the first answer above.

  • If the index contains a lot of min/max overlaps how it will be decided wether to use the index or not?

We'll start with a naive approach which is just applying the index regardless of the effectiveness. Except for the index processing time, the query plan doesn't get worse and the index processing time would be dwarfed by the query processing time.

Later, we might add mechanisms to help users, such as notifying the expected effectiveness of an index based on the selected sketches.

  • How is data restatement handled (ie: replace /data/p1 with a new /data/p1 with different values that will require re-compute and update the min/max values in the index)?

In Hyperspace, a file is considered to be equal to another file only if their paths, sizes, and modified times are the same. The tuple (path, size, modified time) can identify files in cases like the one you mentioned. I'll update the examples.

  • Will there be a different API for this type of index?

For users, the only difference is that they provide a different type of IndexConfig to Hyperspace.createIndex. Users can use DataSkippingIndexConfig to create data skipping indexes. Other than that the API is the same, i.e. methods like refreshIndex or optimizeIndex can be done with the same interface.

  • Updating the index in the case of 10x millions of files? What's the process to add 1 files with 100 million rows with values in-between the already present index? Will it require a full rebuild?

Adding new files is easy. New index data will be stored in new parquet files.

Spark will be unusably slow with 10x millions of files to begin with, even without Hyperspace. In the first version of data skipping indexes, we will focus on a smaller number of files and will consider improving the performance for a large number of files.

  • What the targeted SLA for consistency? I think Hyperspace offers consistency when the hybrid scan is enabled, but otherwise is there an SLA for getting to consistent state? I'm bringing this because if the index gets bigger and bigger the process of keeping it up to date may become slower and slower thus not being consistent with the data itself.

What does SLA stand for? Anyway, incremental refresh can add files and optimize can reduce the number of files. This is more or less the same as covering indexes.

  • Do we mark also the null values? How are the null values handled because they are neither min nor max?

For MinMaxSketch, we will ignore null values and just consider non-null values only. It means filters like a > 10 can be accelerated while filters like a > 10 or a is null cannot. We can add something like NullSketch if filters allowing null values are common.

Some more information would be very helpful on how the plans will look. Can you provide some snippets of such plans altered to make use of skip indexes?

A basic example is given in the proposal. Please refer to the section "Data Skipping Index Basics". The transformed plan will be the same as the original plan, except that the relation node in the transformed plan will have fewer files than the original one.

andrei-ionescu commented 3 years ago
  • How is data restatement handled (ie: replace /data/p1 with a new /data/p1 with different values that will require re-compute and update the min/max values in the index)?

In Hyperspace, a file is considered to be equal to another file only if their paths, sizes, and modified times are the same. The tuple (path, size, modified time) can identify files in cases like the one you mentioned. I'll update the examples.

Can you link me to the updated example? I couldn't find them. Thanks

  • Updating the index in the case of 10x millions of files? What's the process to add 1 files with 100 million rows with values in-between the already present index? Will it require a full rebuild?

Adding new files is easy. New index data will be stored in new parquet files.

So, it will add a new skipping index entry in the index with /data/p999999, min: -999999, max: 999999 overlapping with the majority of the already existing entries. No rebuild.

Spark will be unusably slow with 10x millions of files to begin with, even without Hyperspace. In the first version of data skipping indexes, we will focus on a smaller number of files and will consider improving the performance for a large number of files.

Depending on the Spark cluster size, 10 millions of files can be easily processed in parallel. Given the biggest Spark cluster size can hold up 8000 nodes, and that max cores in a VM is about 32, the total number of files to be processed in parallel is 256000. For 1 million, 4 times this process is easily doable, and even 40 times the process is easily achievable.

Now, Hyperspace is the key. If the data skipping is well defined and implemented, then instead of using such a big cluster, it may reduce to a small cluster to achieve the same result.

We do have many of such datasets, PetaBytes of data with millions of files. 1PB / 1 000 000 = 1Gb, means 1 million of 1Gb files means 1Pb of data. This is a use case that we see more and more.

Data Skipping index working for small datasets is a helper but having it work for very big datasets will be a game changer.

So, please think big and try to accommodate the heavy scenarios.

  • What the targeted SLA for consistency? I think Hyperspace offers consistency when the hybrid scan is enabled, but otherwise is there an SLA for getting to consistent state? I'm bringing this because if the index gets bigger and bigger the process of keeping it up to date may become slower and slower thus not being consistent with the data itself.

What does SLA stand for?

SLA = Service Level Agreement.

My question is what is the time from a dataset being changed and the index being brought up to date with the new data? Or, what's the time for a incremental refresh in the case of Data Skipping Index? Is this update time keeping constant over the time while the data grows?

I'm bringing these questions up because if my data update/append takes 10s and the data skipping index update takes 10min then the index will be unusable in the fast changing datasets use cases.

  • Some more information would be very helpful on how the plans will look. Can you provide some snippets of such plans altered to make use of skip indexes?

A basic example is given in the proposal. Please refer to the section "Data Skipping Index Basics". The transformed plan will be the same as the original plan, except that the relation node in the transformed plan will have fewer files than the original one.

I did read the example and make sense. I'm more interested in the Spark Plan itself. Like how the .queryExecution.optimizedPlan and .queryExecution.sparkPlan looks at the query time.

@rapoth, @imback82, @sezruby: I'm CC'ing you as the discussion is in part leaning on the product definition side.

clee704 commented 3 years ago
  • How is data restatement handled (ie: replace /data/p1 with a new /data/p1 with different values that will require re-compute and update the min/max values in the index)?

In Hyperspace, a file is considered to be equal to another file only if their paths, sizes, and modified times are the same. The tuple (path, size, modified time) can identify files in cases like the one you mentioned. I'll update the examples.

Can you link me to the updated example? I couldn't find them. Thanks

It is in the PR description: https://github.com/microsoft/hyperspace/pull/461. I've also updated the issue description.

  • Updating the index in the case of 10x millions of files? What's the process to add 1 files with 100 million rows with values in-between the already present index? Will it require a full rebuild?

Adding new files is easy. New index data will be stored in new parquet files.

So, it will add a new skipping index entry in the index with /data/p999999, min: -999999, max: 999999 overlapping with the majority of the already existing entries. No rebuild.

That's right. Incremental refresh will only add index data files unless there are deleted source data files. If there are deleted source data files, the index data is rewritten without entries for those deleted files, but this should be a cheap operation compare to reading all source data files again.

Spark will be unusably slow with 10x millions of files to begin with, even without Hyperspace. In the first version of data skipping indexes, we will focus on a smaller number of files and will consider improving the performance for a large number of files.

Depending on the Spark cluster size, 10 millions of files can be easily processed in parallel. Given the biggest Spark cluster size can hold up 8000 nodes, and that max cores in a VM is about 32, the total number of files to be processed in parallel is 256000. For 1 million, 4 times this process is easily doable, and even 40 times the process is easily achievable.

Now, Hyperspace is the key. If the data skipping is well defined and implemented, then instead of using such a big cluster, it may reduce to a small cluster to achieve the same result.

We do have many of such datasets, PetaBytes of data with millions of files. 1PB / 1 000 000 = 1Gb, means 1 million of 1Gb files means 1Pb of data. This is a use case that we see more and more.

Data Skipping index working for small datasets is a helper but having it work for very big datasets will be a game changer.

So, please think big and try to accommodate the heavy scenarios.

Data skipping index per se will be scalable with >1 million files. The problem here is that the way we process files in Hyperspace is not optimized for it. We'll keep improving things in this regard.

  • What the targeted SLA for consistency? I think Hyperspace offers consistency when the hybrid scan is enabled, but otherwise is there an SLA for getting to consistent state? I'm bringing this because if the index gets bigger and bigger the process of keeping it up to date may become slower and slower thus not being consistent with the data itself.

What does SLA stand for?

SLA = Service Level Agreement.

My question is what is the time from a dataset being changed and the index being brought up to date with the new data? Or, what's the time for a incremental refresh in the case of Data Skipping Index? Is this update time keeping constant over the time while the data grows?

I'm bringing these questions up because if my data update/append takes 10s and the data skipping index update takes 10min then the index will be unusable in the fast changing datasets use cases.

Incremental refresh should be fast, because it only looks at new data and appends new index data files without accessing previous source data or index data.

  • Some more information would be very helpful on how the plans will look. Can you provide some snippets of such plans altered to make use of skip indexes?

A basic example is given in the proposal. Please refer to the section "Data Skipping Index Basics". The transformed plan will be the same as the original plan, except that the relation node in the transformed plan will have fewer files than the original one.

I did read the example and make sense. I'm more interested in the Spark Plan itself. Like how the .queryExecution.optimizedPlan and .queryExecution.sparkPlan looks at the query time.

@rapoth, @imback82, @sezruby: I'm CC'ing you as the discussion is in part leaning on the product definition side.

At the query time, the plan won't be much different from the original plan. The only difference would be that file scan nodes in the optimized plan will access fewer files.