snowflakedb / spark-snowflake

Snowflake Data Source for Apache Spark.
http://www.snowflake.net
Apache License 2.0
203 stars 97 forks source link

possible error or different behaviour when writing in append mode and column mapping by name #468

Open josemanuel-hita opened 1 year ago

josemanuel-hita commented 1 year ago

Hello,

I'm having a problem related to writing in APPEND mode when column mapping by NAME is configured. If the target table doesn't exist, the connector is failing with the next exception:

net.snowflake.client.jdbc.SnowflakeSQLException: SQL compilation error:
Object 'table_doesnt_exist' does not exist or not authorized.
  at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:127)
  at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:67)
  at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:456)
  at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:366)
  at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:468)
  at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:196)
  at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:133)
  at net.snowflake.client.core.SFStatement.describe(SFStatement.java:152)
  at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.describeSqlIfNotTried(SnowflakePreparedStatementV1.java:97)
  at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.getMetaData(SnowflakePreparedStatementV1.java:552)
  at net.snowflake.spark.snowflake.DefaultJDBCWrapper$DataBaseOperations.tableMetaDataFromStatement(SnowflakeJDBCWrapper.scala:570)
  at net.snowflake.spark.snowflake.DefaultJDBCWrapper$DataBaseOperations.tableMetaData(SnowflakeJDBCWrapper.scala:563)
  at net.snowflake.spark.snowflake.JDBCWrapper.resolveTable(SnowflakeJDBCWrapper.scala:82)
  at net.snowflake.spark.snowflake.SnowflakeWriter.save(SnowflakeWriter.scala:65)
  at net.snowflake.spark.snowflake.DefaultSource.createRelation(DefaultSource.scala:141)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
  at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
  ... 47 elided

I can understand that writing in APPEND mode implies the target table should exist before the operation. But if I execute the same operation, configuring column mapping by ORDER, the operation doesn't fail and the connector automatically creates the target table. In that case you can see the next query in the table history:

create   table if not exists identifier(...) (...)

I couldn't find any documentation about the different behaviours when writing in APPEND mode depending on the column mapping configuration. Are these the expected behaviours for both column mapping options? Or is it an issue when setting column mapping by name?

I get the same behaviour with different snowflake-connector versions:

Code to reproduce append by name (exception occurs)

val target_table = "table_doesnt_exist"
val dfReader = df.option("dbtable", "*****").load()
val sfWriteConfigMapByName = Map(
  ...
  "column_mismatch_behavior" -> "ERROR",
  "column_mapping" -> "NAME",
  "keep_column_case" -> "false",
  "dbtable" -> target_table
)
dfReader.write.format("snowflake").options(sfWriteConfigMapByName).mode("append").option("truncate_table", "off").save()

Code to reproduce append by order (target table created)

val target_table = "table_doesnt_exist"
val dfReader = df.option("dbtable", "*****").load()
val sfWriteConfigMapByOrder = Map(
  ...
  "column_mismatch_behavior" -> "ERROR",
  "column_mapping" -> "ORDER",
  "keep_column_case" -> "false",
  "dbtable" -> target_table
)
data.write.format("snowflake").options(sfWriteConfigMapByOrder).mode("append").option("truncate_table", "off").save()

(Note: be aware to drop table if you run again the code :-))

sfc-gh-mrui commented 1 year ago

@josemanuel-hita Thanks to report this issue and we can reproduce this issue. Setting column_mapping as name indicates Spark connector needs to adjust the DataFrame’s column order based on the target table’s column order for the write. It implies the target table needs to exist. So this is more like a user-error. But the error message is kind of misleading. Spark connector may need to raise a more readable error message for this case.

josemanuel-hita commented 1 year ago

Thanks for your quick answer @sfc-gh-mrui. Following what you say, we could maybe think that selecting the append mode when the target table doesn't exist, is in fact a user-error (regardless of the column mapping we select)... Couldn't the connector skip reorder the DataFrame's columns when the target table doesn't exist? In this case, the target table might be created using the same columns order as the DataFrame...

Thank you!

sfc-gh-mrui commented 1 year ago

@josemanuel-hita As for append mode with column_mapping = index, it doesn't create table either in the beginning, it is enhanced after some customers request it. If spark connector internally creates the target table with DataFrame's column names for this use case, it indicates Spark Connector ignores customer's setting for column_mapping because index is default value. Some users may like this behaviour, some may not. We need to review it with PM if we want to do it. Could you please contact the support engineer to file a support ticket? I can work with PM about it.

josemanuel-hita commented 1 year ago

Hi, I'm talking with my Product team, I'll let you know any news, thank you!