delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.18k stars 1.62k forks source link

Fixes #348 Support Dynamic Partition Overwrite #371

Closed koertkuipers closed 2 years ago

koertkuipers commented 4 years ago

The goal of this PR to to support dynamic partition overwrite mode on writes to delta.

To enable this on a per write add .option("partitionOverwriteMode", "dynamic"). It can also be set per sparkSession in the SQL Config using .config("spark.sql.sources.partitionOverwriteMode", "dynamic").

Some limitations of this pullreq: Dynamic partition overwrite mode in combination with replaceWhere is not supported. If both are set this will result in an error. The SQL INSERT OVERWRITE syntax does not yet support dynamic partition overwrite. For this more changes will be needed to be made to org.apache.spark.sql.delta.catalog.DeltaTableV2 and related classes.

pradomota commented 3 years ago

Any updates about this PR? I'm interested too ❤️

pradomota commented 3 years ago

gentle ping @jaceklaskowski

gdoron commented 3 years ago

Why is this PR stuck? Just because of the tiny code standard and a typo or is there something more broad here? If it's just stuck with nit things, I don't mind taking it from here. @jaceklaskowski

Namanj commented 3 years ago

@gdoron Can you please take this forward?

sabyasachinandy commented 3 years ago

Any updates? Can we please merge this PR?

gdoron commented 3 years ago

@dennyglee @brkyvz Can you please share some details on why it's stuck? We really want to see it on delta-databricks already, it heavily downgrades the appeal of delta, I had a member in my team, recently forgetting to use replaceWhere and deleted an entire table (like I feared it would happen here.

flvndh commented 2 years ago

@jaceklaskowski Is there any chance this could get merged in the near future?

koertkuipers commented 2 years ago

the failed tests are unrelated to this pull req: Our attempt to download sbt locally to build/sbt-launch-0.13.18.jar failed. Please install sbt manually from http://www.scala-sbt.org/

gdoron commented 2 years ago

@jaceklaskowski thanks A LOT for following up on this❣️

AbderrahmenM commented 2 years ago

Any news about this please ?

zbstof commented 2 years ago

There are conflicting changes introduced in https://github.com/delta-io/delta/commit/ae99d7190ad0f74b5126b1a4be7eddae1c7d046f now which need to be merged.

koertkuipers commented 2 years ago

There are conflicting changes introduced in ae99d71 now which need to be merged.

just pushed update where i merged in master and fixed conflicts replaceWhere is more complex now. so i just gave up on trying to support dynamic partition overwrite with replaceWhere for now...

Tagar commented 2 years ago

DBR 9.1 LTS release notes - https://docs.databricks.com/release-notes/runtime/9.1.html#delta-now-supports-arbitrary-replacewhere

In combination with overwrite mode, the replaceWhere option can be used to simultaneously overwrite data that matches a predicate defined in the option. Previously, replaceWhere supported a predicate only over partition columns, but it can now be an arbitrary expression.

koertkuipers commented 2 years ago

the failed tests dont seem to have anything to do with this pullreq...


[info]   java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
[info]   at sun.nio.ch.Net.bind0(Native Method)```
tdas commented 2 years ago

hey @koertkuipers ... after a lot of community demand, we have finally decided to add support for this :) are you still interested in contributing this feature?

tdas commented 2 years ago

alright .. i saw your response on the issue about your interest. could you update your PR to latest master and run the tests? lets see if it still works?

koertkuipers commented 2 years ago

alright .. i saw your response on the issue about your interest. could you update your PR to latest master and run the tests? lets see if it still works?

it is now updated to latest master

koertkuipers commented 2 years ago

i would like to point out that dynamic partition overwrite and replaceWhere kind of get in each others way. in this pullreq i made the simplifying assumption that if replaceWhere is present it takes control and dynamic partition overwrite is effectively disabled.

dennyglee commented 2 years ago

From your perspective, do you think this a docs problem - i.e. clarify that replaceWhere is present will effectively disable dynamic partition overwrite or should we consider a future update/design?

tdas commented 2 years ago

@koertkuipers thank you for updating the PR. I agree with your concerns. let me start reviewing it and figure out what is the right approach. @allisonport-db can you also help review the PR.

koertkuipers commented 2 years ago

From your perspective, do you think this a docs problem - i.e. clarify that replaceWhere is present will effectively disable dynamic partition overwrite or should we consider a future update/design?

for our use cases currently the simplifying assumption works fine (and we do use replaceWhere too). so i would vote for just a docs problem. however i am biased. we have dynamic overwrite mode enabled by default across all sources (a user needs to explicitly set it to static to override it). we find this a more reasonable default in general. and so in our situation where dynamic overwrite mode is the default and a user probably did not explicitly set it, and then a user did explicitly set replaceWhere it seems natural that replaceWhere would take over.

koertkuipers commented 2 years ago

Added a few comments. I think we need to add a few more test cases..

1. As commented in the code, fail for providing incorrect partition overwrite mode

2. Data written overwrites no partitions

3. Partition by 2+ columns

And further discuss behavior in regards to replaceWhere

i think i added tests for these 3 cases

koertkuipers commented 2 years ago

Thanks for updating. Looks pretty good to me left a few more minor comments.

I think we still need to further discuss behavior in respect to replaceWhere.

I think two main options are (1) replaceWhere takes precedence (what you've implemented) (2) we don't allow both to be set (throw an error)

I think a minor variation could be (1) when replaceWhere is used and dynamic partition overwrite is enabled in the spark conf (since data source option should take precedence) and (2) if they're both set as an option

i would be ok with an error if they're both set as an option, so your minor variation number 2 in DeltaWriteOptionsImpl we could check if both were set as an option, and then log warning or throw error.

tdas commented 2 years ago

Another major point i completely missed in this first pass. According to spark docs, the SQL conf should make INSERT OVERWRITE follow dynamic partitioning too. But I dont see any tests with INSERT OVERWRITE. I think that must be tested as well.

allisonport-db commented 2 years ago

Can you also add a test for when the initial table is partitioned, and we write an unpartitioned dataframe with partitionOverwriteMode = dynamic?

koertkuipers commented 2 years ago

Can you also add a test for when the initial table is partitioned, and we write an unpartitioned dataframe with partitionOverwriteMode = dynamic?

i added test where table is created partitioned and then data is written with dynamic partition overwrite mode but no explicit partitionBy. is this what you asked for? i believe it behaves exactly same as if partitionBy was there on write (e.g. the table seems to know its partitioned, so its not necessary to explicitly mention it on the write). is this the expected behavior?

koertkuipers commented 2 years ago

Another major point i completely missed in this first pass. According to spark docs, the SQL conf should make INSERT OVERWRITE follow dynamic partitioning too. But I dont see any tests with INSERT OVERWRITE. I think that must be tested as well.

reposting this because it seemed to somehow attach to wrong conversation...

ok i went down the rabbit hole of trying to understand how this works. seems like DeltaTableV2 supports V1_BATCH_WRITE (and uses InsertableRelation) but not BATCH_WRITE. my limited understanding here is that V1_BATCH_WRITE does not support OVERWRITE_DYNAMIC? is that correct?

tdas commented 2 years ago

Arrggh, my knowledge about this is rusty as well. I gotta do a deep dive to really understand this as well. But tell me why did you have to go through the rabbit hole? Is it because just setting the conf does not work with INSERT OVERWRITE?

koertkuipers commented 2 years ago

Arrggh, my knowledge about this is rusty as well. I gotta do a deep dive to really understand this as well. But tell me why did you have to go through the rabbit hole? Is it because just setting the conf does not work with INSERT OVERWRITE?

correct. it throws error that it is not supported.

this was my rabbit hole not knowing too much about the SQL side...

it seems the tests support DPE out of gate:

abstract class DeltaInsertIntoTests(                                                                                                                                    
    override protected val supportsDynamicOverwrite: Boolean,                                                                                                           
    override protected val includeSQLOnlyTests: Boolean)                                                                                                                
  extends InsertIntoSQLOnlyTests {

this allows me to trivially turn this on for DeltaInsertIntoSQLSuite which is great! the resulting error is:

[info] - InsertInto: overwrite - dynamic clause - dynamic mode *** FAILED *** (1 second, 102 milliseconds)
[info]   org.apache.spark.sql.AnalysisException: Table default.tbl does not support dynamic overwrite in batch mode.;
[info] OverwritePartitionsDynamic RelationV2[id#3832L, data#3833] default.tbl, false
[info] +- Project [id#3217L, data#3218]
[info]    +- SubqueryAlias tmp_view
[info]       +- View (`tmp_view`, [id#3217L,data#3218])
[info]          +- Project [_1#3213L AS id#3217L, _2#3214 AS data#3218]
[info]             +- LocalRelation [_1#3213L, _2#3214]
[info]   at org.apache.spark.sql.errors.QueryCompilationErrors$.unsupportedTableOperationError(QueryCompilationErrors.scala:805)
[info]   at org.apache.spark.sql.errors.QueryCompilationErrors$.unsupportedDynamicOverwriteInBatchModeError(QueryCompilationErrors.scala:821)
[info]   at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.$anonfun$apply$1(TableCapabilityCheck.scala:54)
[info]   at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.$anonfun$apply$1$adapted(TableCapabilityCheck.scala:40)

i think the check fails because of the capabilities set on DeltaTableV2:

  override def capabilities(): ju.Set[TableCapability] = Set(                                                                                                           
    ACCEPT_ANY_SCHEMA, BATCH_READ,                                                                                                                                      
    V1_BATCH_WRITE, OVERWRITE_BY_FILTER, TRUNCATE                                                                                                                       
  ).asJava

the required capabilities for DPE are in Spark's TableCapabilityCheck.scala:

      case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _)
        if !r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_DYNAMIC) =>
        throw QueryCompilationErrors.unsupportedDynamicOverwriteInBatchModeError(r.table)

so i can just add BATCH_WRITE and OVERWRITE_DYNAMIC to our capabilities, but the issue is that Delta doesnt seem to support BATCH_WRITE. the error i now get is:

[info] - InsertInto: overwrite - dynamic clause - dynamic mode *** FAILED *** (1 second, 77 milliseconds)
[info]   org.apache.spark.SparkException: Table does not support dynamic partition overwrite: DeltaTableV2(org.apache.spark.sql.delta.test.DeltaTestSparkSession@279998e3,file:/home/koert/src/tresata-opensource/delta/core/spark-warehouse/org.apache.spark.sql.delta.DeltaInsertIntoSQLSuite/tbl,Some(CatalogTable(
[info] Database: default
[info] Table: tbl
[info] Created Time: Wed Jun 01 14:21:06 PDT 2022
[info] Last Access: UNKNOWN
[info] Created By: Spark 3.2.0
[info] Type: MANAGED
[info] Provider: delta
[info] Location: file:/home/koert/src/tresata-opensource/delta/core/spark-warehouse/org.apache.spark.sql.delta.DeltaInsertIntoSQLSuite/tbl
[info] Partition Provider: Catalog)),Some(default.tbl),None,Map(),org.apache.spark.sql.util.CaseInsensitiveStringMap@1f)
[info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.dynamicPartitionOverwriteUnsupportedByTableError(QueryExecutionErrors.scala:1627)
[info]   at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:76)
[info]   at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:38)

the "fix" for this is to have WriteIntoDeltaBuilder extend SupportsDynamicOverwrite and add a small method override:

  override def overwriteDynamicPartitions(): WriteBuilder = {                                                                                                           
    options.put("partitionOverwriteMode", "DYNAMIC")                                                                                                                    
    this                                                                                                                                                                
  }       

now the error is:

[info] - InsertInto: overwrite - dynamic clause - dynamic mode *** FAILED *** (1 second, 119 milliseconds)
[info]   java.lang.UnsupportedOperationException: class org.apache.spark.sql.delta.catalog.WriteIntoDeltaBuilder$$anon$1: Batch write is not supported
[info]   at org.apache.spark.sql.connector.write.Write.toBatch(Write.java:54)
[info]   at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:309)
[info]   at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:308)
[info]   at org.apache.spark.sql.execution.datasources.v2.OverwritePartitionsDynamicExec.run(WriteToDataSourceV2Exec.scala:271)
[info]   at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)

so the error is thrown because WriteIntoDeltaBuilder didn't override toBatch, which is required if you declare capability BATCH_WRITE. its explained in Spark's Write.java here:

  /**                                                                                                                                                                   
   * Returns a {@link BatchWrite} to write data to batch source. By default this method throws                                                                          
   * exception, data sources must overwrite this method to provide an implementation, if the                                                                            
   * {@link Table} that creates this write returns {@link TableCapability#BATCH_WRITE} support in                                                                       
   * its {@link Table#capabilities()}.                                                                                                                                  
   */                                                                                                                                                                   
  default BatchWrite toBatch() {                                                                                                                                        
    throw new UnsupportedOperationException(description() + ": Batch write is not supported");                                                                          
  } 

well thats where i gave up. any suggestions welcome!

tdas commented 2 years ago

oh wow! that is a rabbit hole. thank you for step by step explaining the issue. i think we need to separate SQL support from this PR.

tdas commented 2 years ago

Oh, please do add a description to the PR :)

koertkuipers commented 2 years ago

Oh, please do add a description to the PR :)

done