microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
423 stars 115 forks source link

[PROPOSAL]: Support variable schema for included columns #263

Open pirz opened 3 years ago

pirz commented 3 years ago

Problem Description

This design proposal is for adding feature request #229.

Currently, Hyperspace supports creating indexes only on data with fixed schema. This means:

This makes it impossible to support creating index on data with evolving schema.

It is inevitable to enforce "no change" restriction on "indexed" columns as index records are bucketized and sorted on them; However, "included" columns are really payload data and do not affect physical layout of index files, therefore "no change" restriction on them could be lifted.

There are two cases to consider for schema changes:

Goal

The user should be able to create/refresh an index on data with evolving schema with ease.

General Assumptions

  1. Indexed columns do not change and they are always present in all source data records during index creation and any subsequent index refresh.
  2. Data records can not have conflicting columns with the same name but different data types added as included columns, across index versions. (Parquet's "mergeSchema" does not support this case either. As an example, if we have a column whose value is String in a set of records and Int in other records, when trying to load all these records into a DataFrame using the "mergeSchema" option, ParquetReader fails with error: Failed to merge incompatible data types string and int.)
  3. Included columns are nullable as they may not exist in all index records. We do not support defining a customized default value for an included column. We follow/use what Parquet does with missing columns when reading data with pre-defined schema (i.e. fills them with null).

Solution Overview

Requirements

  1. User should be able to create/refresh an index on data formats: Parquet, Json, csv.
  2. createIndex and refreshIndex APIs should let user define "included" column(s) easily by including or excluding columns from the data schema.
  3. List of included columns can be changed during index refresh. New list of included columns is defined by merging included columns from the latest index version with the changes made during refresh. Here are some assumptions:
    • A New included column can not have conflict with indexed or existing included columns. It can not be among indexed columns and there can not be a column with the same name but a different data type among existing included columns.
    • During index refresh in the "incremental" mode, if a new column is added to the included columns or an existing included column is excluded from included columns list, we only include/exclude it in the index records being created on new data files (i.e. appended files). If the user wants to add/remove the column to/from all existing index records, index refresh in the "full" mode should be used.

API changes

We make changes to the createIndex and refreshIndex APIs to let the user provide information about included columns.

Changes in IndexConfig

We modify index config so that instead of receiving a single list of included columns, it receives an instance of IncludedColumns which contains two lists of columns: "include" and "exclude". They show the columns to include/exclude, according to a known reference schema, to define index's included columns.

case class IndexConfig(
    indexName: String,
    indexedColumns: Seq[String],
    includedColumns: IncludedColumns)

case class IncludedColumns(
   include: Seq[String] = Nil,
   exclude: Seq[String] = Nil)

Changes in createIndex API

createIndex API remains unchanged; However the list of included columns is now computed according to the above change made to IndexConfig.

During index creation, we use the schema from user's given DataFrame df as the reference schema and compute included columns using this schema with include/exclude columns from IndexConfig.includedColumns.

As an example, assume user wants to create an index on DataFrame df whose schema has 5 columns:

{"C0", "C1", "C2", "C3", "C4"}

and "C0" is picked as the indexed column.

Below is how IndexConfig.includedColumns can be used to define different sets of included columns:

Note that if lineage is enabled, we make a minor change to the way index schema is extended:

  1. Lineage column is added to index schema (no change to current behavior).
  2. If source data is partitioned, we add any partitioning column which is not already part of user defined indexed/included columns, UNLESS the column is explicitly excluded from the index by user.

Changes in refreshIndex API

During refresh index, Hyperspace has to create a DataFrame, df, on source data files that need to be indexed. We use schema(schema: StructType) API in DataFrameReader to define the schema for df. Note that we do not need to have full source data schema here, but only the relevant columns for the index which are columns to be indexed or included during refresh.

During refresh, user can use an instance of IncludedColumns (defined above) to modify existing included columns.

If there is no change in included columns or if user only removes some existing included column(s), we can simply use index schema from the latest index version as DataFrame schema when creating df.

If user adds new columns to included columns, we need to add those columns to the df schema. For Parquet and Json, this is straightforward as they are self-describing formats. However, for csv, the schema should be correctly extended to have the name and data type for any new included column.

We add two "optional" arguments: includedColumns and schema to refreshIndex API to address above:

  1. includedColumns is used to define changes in included columns:
    • A column in includedColumns.include is treated as a new included column and will be added to index records created during refresh.
    • A column in includedColumns.exclude will be removed from the list of included columns and index records created during refresh will not have it.
  2. schema is used when there are some new columns in includedColumns.include and it defines the data type for each of those columns. We compute the schema for df by merging this schema with latest index schema minus excluded columns defined in IncludedColumns.exclude (if any).
    def refreshIndex(
    indexName: String,
    mode: String,
    includedColumns: IncludedColumns, // optional, used if change needed in current included columns
    schema: StructType // optional, used if includedColumns.include is non-empty
    )

Index Metadata Changes

Index metadata is updated so that:

Note that there could be some column(s) in schemaString which are not among included columns. This can happen if user adds a column as included column in some early version of index and then drops it when refreshing index in incremental mode.

Moreover, we do not need to store source data schema under source.plan.properties.relations.head.dataSchemaJson. Currently, this schema is only used to define df schema when refreshing. However, with the changes explained above to refreshIndex API, this schema can now be computed from index's schemaString and included columns plus the new schema argument added to refreshIndex.

Index leverage at query optimization/execution

Given that an index schema can now be changed by adding/removing included columns, index Parquet files are no longer guaranteed to have the same schema. One option to load them into a DataFrame is using mergeSchema option, however as this option is costly, we avoid using it and instead index files are loaded by providing the merged schema from index metadata. This can be done by setting schema(<merged schema from metadata>) when loading index content into a DataFrame. Existing rules also set this merged schema as the relation schema when replacing data source with an index.

Impact on Index Optimization

As the set of included columns can be changed when refreshing index, index records that belong to the same bucket could have different schema. This affects the OptimizeAction as it operates on merging separate smaller Parquet index files, whose content belong to the same bucket, into a single large Parquet index file. However, a given Parquet file can only have a single schema. Therefore:

There are two approaches for fixing OptimizeAction:

Example Scenario

Here is an example scenario showing how index metadata and index files would be changed as an index goes through refresh and optimization while both source data and included columns are changed.

V0: // after create
    Included Columns:   {C1, C2}
    Schema:         {C0, C1, C2}

    IX files: 
        I00 (schema = {C0, C1, C2})
        I01 (schema = {C0, C1, C2})

V1: // refresh incremental mode with some additional/deleted files
    // Change included columns: Drop "C1" and Add "C3"

    Included Columns:   {C2, C3}
    Schema:         {C0, C1, C2, C3}

    IX files:
        I00' (schema = {C0, C1, C2})
        I01' (schema = {C0, C1, C2})
        I10  (schema = {C0, C2, C3})

V2: // refresh full mode with some additional/deleted files
    // Change included columns: Drop "C3" and Add "C4"

    Included Columns:   {C2, C4}
    Schema:         {C0, C2, C4}

    IX files
        I20  (schema = {C0, C1, C2})
        I21  (schema = {C0, C1, C2})

V3: // refresh incremental mode with some additional/deleted files
    // Change included columns: Drop "C4" and Add "C5"

    Included Columns:   {C2, C5}
    Schema:         {C0, C2, C4, C5}

    IX files
        I20' (schema = {C0, C1, C2})
        I21' (schema = {C0, C1, C2})
        I31  (schema = {C0, C2, C5})
        I32  (schema = {C0, C2, C5})

V4: // refresh incremental mode with some additional/deleted files
    // No Change in included columns

    Included Columns:   {C2, C5}
    Schema:         {C0, C2, C4, C5}

    IX files
        I20'' (schema = {C0, C1, C2})
        I21'' (schema = {C0, C1, C2})
        I31'  (schema = {C0, C2, C5})
        I32'  (schema = {C0, C2, C5})
        I41   (schema = {C0, C2, C5})

V5: // optimize in full mode

    Included Columns:   {C2, C5}
    Schema:         {C0, C2, C4, C5}

    Approach-1: // Merge schemas when merging files
    IX files
        I40  (schema = {C0, C2, C4, C5})
        I41  (schema = {C0, C2, C4, C5})
        I41  (schema = {C0, C2, C4, C5})

        Approach-2: // Merge files with compatible schema
    IX files
        I20'' (schema = {C0, C1, C2})
        I21'' (schema = {C0, C1, C2})
        I31'  (schema = {C0, C2, C5})
        I51 (schema = {C0, C2, C5}) // merge of I32' and I41

Implementation

pirz commented 3 years ago

@imback82 @apoorvedave1 @sezruby @rapoth Can you take a look at this proposal and leave your comments (if any)? Thnx

imback82 commented 3 years ago
  • createIndex(df, mode = "include", columns = Seq("C1", "C2")): Pick {"C1", "C2"} as the included columns.

The current createIndex API takes in IndexConfig. Can you update the examples with indexed columns as well?

imback82 commented 3 years ago

For source data files, we need to use Parquet's mergeSchema as the data could have columns not listed as indexed or included columns.

What if source data files are non-Parquet?

pirz commented 3 years ago

The current createIndex API takes in IndexConfig. Can you update the examples with indexed columns as well?

The examples are updated by adding IndexConfig.

pirz commented 3 years ago

@imback82 I updated the proposal to handle schema in source data files. Specifically I added a new section: "Index Creation and Refresh changes" and modified "Index Metadata Changes". Plz take a look and let me know about your comments. Thnx!

sezruby commented 3 years ago

Generally LGTM! How about includeColumns excludeColumns rather than mode + columns? I wonder incremental refresh will work well with the previous data. Could you create a prototype PR?

sezruby commented 3 years ago
def refreshIndex(
   indexName: String,
   mode: String,
   includedColumns: IncludedColumns, // optional, used if change needed in current included columns
   schema: StructType // optional, used if includedColumns.include is non-empty
   )

includedColumns + schema can be includedColumnSchema or includedColumns? since schema also includes the name.

And I think it's okay not to support optimizeIndex for changed schema at first, to avoid complexity; we could do it later if it's needed.