eto-ai / rikai

Parquet-based ML data format optimized for working with unstructured data
https://rikai.readthedocs.io/en/latest/
Apache License 2.0
136 stars 19 forks source link

`df.saveAsTable()` does not take partitionBy #640

Closed eddyxu closed 2 years ago

eddyxu commented 2 years ago
df.write.partitionBy("col").format("rikai").saveAsTable("blabla")

does not write files in partitions.

But the following code, without specifying rikai format does write in partitions.

df.write
      .partitionBy("col")
      .mode("overwrite")

      .saveAsTable("parts")
Renkai commented 2 years ago

I would take a look

ref: https://github.com/eto-ai/rikai/commit/70e1ccb9d5c22bfc07aaa4d23148f42ad58cccb5

Renkai commented 2 years ago

What's the environment for reproducing? local cluster, an open-source cluster of databricks platform?

Renkai commented 2 years ago

Save to path method works fine, try to find the special part of saveAsTable. https://github.com/eto-ai/rikai/blob/70e1ccb9d5c22bfc07aaa4d23148f42ad58cccb5/src/test/scala/ai/eto/rikai/RikaiRelationTest.scala#L61

Renkai commented 2 years ago

By my local test

df.write.format("rikai").partitionBy("col").saveAsTable("blabla")

also fails, so it's not a config overwritten accident. Maybe that's because the rikai format can't satisfy some condition for partitionBy

Renkai commented 2 years ago

Here should carry __partition_columns in table.storage.properties, but it's not, will find out why.

ai.eto.rikai.RikaiOptions.<init>(RikaiOptions.scala:20)
ai.eto.rikai.RikaiRelationProvider.createRelation(RikaiRelationProvider.scala:68)
org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:532)
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:220)
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:177)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:753)
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:731)
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:626)
ai.eto.rikai.RikaiRelationTest.$anonfun$new$1(RikaiRelationTest.scala:75)
Renkai commented 2 years ago

DataSource has a field partitionColumns, it should put the field as a parameter here, but it's not. Do we need to patch Spark? How did parquet get this parameter?

https://github.com/apache/spark/blob/v3.1.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L532