microsoft / sql-spark-connector

Apache Spark Connector for SQL Server and Azure SQL
Apache License 2.0
274 stars 116 forks source link

Dataframe.write with table containing Always generate columns and auto generate columns is failing #123

Open sfsjoshi opened 3 years ago

sfsjoshi commented 3 years ago

Below write fails with No key found exception if UUID, Sysstarttime, sysendtime are not part of dataframe. If all three fields are added to the dataframe it throws "Cannot insert an explicit value into a GENERATED ALWAYS column in table error."

Any help is highly appreciated. Thanks

Exception details rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 25, 10.139.64.4, executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: Cannot insert an explicit value into a GENERATED ALWAYS column in table '<

>'. Use INSERT with a column list to exclude the GENERATED ALWAYS column, or insert a DEFAULT into GENERATED ALWAYS column.

Azure Databricks- 7.6 runtime Azure SQL database Language - PySpark

PySpark Code


df = <<read parquet file>>

df.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("append") \
  .option("url", url) \
  .option("dbtable", "TEMPORAL_TABLE") \
  .option("user", _username) \
  .option("password", _password) \
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
  .option("schemaCheckEnabled", False)\
  .save()

Azure SQL Temporal Table Definition:


CREATE TABLE DBO.TEMPORAL_TABLE(
    [UUID] [varchar](255) NOT NULL,
    [SERVICE_ID] [bigint] NULL,
    [START_DATE] [datetime2](7) NULL,
    [END_DATE] [datetime2](7) NULL,
    [CHANGED_ON] [datetime2](7) NULL,
    [operation] [char](1) NULL,
    [SysStartTime] [datetime2](7) GENERATED ALWAYS AS ROW START NOT NULL,
    [SysEndTime] [datetime2](7) GENERATED ALWAYS AS ROW END NOT NULL,
 CONSTRAINT [PK_TEMPORAL] PRIMARY KEY CLUSTERED 
(
    [UUID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY],
    PERIOD FOR SYSTEM_TIME ([SysStartTime], [SysEndTime])
) ON [PRIMARY]
WITH
(
SYSTEM_VERSIONING = ON ( HISTORY_TABLE = [history].[TEMPORAL_TABLE_HISTORY] )
)
GO

ALTER TABLE DBO.TEMPORAL_TABLE ADD  DEFAULT (newid()) FOR [UUID]
GO

ALTER TABLE DBO.TEMPORAL_TABLEADD  DEFAULT (getutcdate()) FOR [SysStartTime]
GO

ALTER TABLE DBO.TEMPORAL_TABLE ADD  DEFAULT (CONVERT([datetime2],'9999-12-31 23:59:59.9999999')) FOR [SysEndTime]
sfsjoshi commented 3 years ago

@shivsood , would you be able to provide any guidance on issue above (regarding Auto-Generated/ GENERATED ALWAYS columns) issue with the Spark SQL Server Connector? Thanks for your time.

sfsjoshi commented 3 years ago

This is show stopper for me in leveraging spark-connector, I would have no option but to use JDBC. Any help is highly appreciated.

shivsood commented 3 years ago

Below write fails with No key found exception if UUID, Sysstarttime, sysendtime are not part of dataframe. If all three fields are added to the dataframe it throws "Cannot insert an explicit value into a GENERATED ALWAYS column in table error." @sfsjoshi Have you tried with UUID and not providing Sysstarttime, sysendtime? Can u enable debug on spark and send the spark driver/executor logs?

@luxu1-ms as FYR

sfsjoshi commented 3 years ago

stderr.txt log4j-active.txt stdout.txt

Thanks you for helping with issue. Please see below for requisite details. Thanks. Yes, I executed the with same configuration again and encountered -Different number of columns. And if add all columns it throws "Cannot insert an explicit value into a GENERATED ALWAYS column in table" Py4JJavaError Traceback (most recent call last) <command-1081403488109968> in <module> 6 .option("user", user_name) \ 7 .option("password", password) \ ----> 8 .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \ 9 .save() /databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options) 826 self.format(format) 827 if path is None: --> 828 self._jwrite.save() 829 else: 830 self._jwrite.save(path) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 125 def deco(*a, **kw): 126 try: --> 127 return f(*a, **kw) 128 except py4j.protocol.Py4JJavaError as e: 129 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o367.save. : java.sql.SQLException: Spark Dataframe and SQL Server table have differing numbers of columns at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.assertCondition(BulkCopyUtils.scala:624) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.assertIfCheckEnabled(BulkCopyUtils.scala:638) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.matchSchemas(BulkCopyUtils.scala:315) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.getColMetaData(BulkCopyUtils.scala:269) at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:79) at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:64) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1018) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:841) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:198) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1018) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:439) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:423) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)

TheScriptingGuy commented 3 years ago

Hi i'm having the same issue with Generated columns, period columns in and outside temporal tables. The following error stack is from a table with two generated always colums with a period setting on a regular table (so not a temporal table).

Note: I want SQL server to generate the fields and not supply any values, I only want to insert the fields which I have in the spark cluster.


Py4JJavaError Traceback (most recent call last)

in 7 .option("password", dbutils.secrets.get(scope = "keyvaultscope", key = "sqldbpassword")) \ 8 .option("tableLock", True) \ ----> 9 .mode('Append') \ 10 .save() /databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options) 826 self.format(format) 827 if path is None: --> 828 self._jwrite.save() 829 else: 830 self._jwrite.save(path) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 125 def deco(*a, **kw): 126 try: --> 127 return f(*a, **kw) 128 except py4j.protocol.Py4JJavaError as e: 129 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o374.save. : java.sql.SQLException: Spark Dataframe and SQL Server table have differing numbers of columns at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.assertCondition(BulkCopyUtils.scala:624) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.assertIfCheckEnabled(BulkCopyUtils.scala:638) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.matchSchemas(BulkCopyUtils.scala:315) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.getColMetaData(BulkCopyUtils.scala:269) at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:79) at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:64) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$3(SparkPlan.scala:252) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1018) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:841) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:198) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1018) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:439) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:423) at sun.reflect.GeneratedMethodAccessor669.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)
shivsood commented 3 years ago

@luxu1-ms are u able to repro this?

arvindshmicrosoft commented 3 years ago

It looks like temporal tables, just like graph tables, need special handling. By extension, the same applies to cases where there are defaults associated with columns in the table, and we are therefore not passing in those columns in the Spark dataframe. All these cases need to be handled correctly for this connector to be functional for those cases.

I made an (completely unofficial) attempt to fix the above issue (the repro provided has a temporal table with system generated columns, as well as handling the case when the column having a default associated with it in SQL, is not passed in the dataframe). The commit is here in case you are interested to build the connector and test on your own. I tested this with equivalent of the OP's repro, and it did work.

I have NOT obviously had the time to add a formal test, and neither has a full regression suite been run. So again, consider this commit as completely unofficial, experimental, and provided as-is.

B4PJS commented 2 years ago

Can you try creating a view over your temporal table omitting the generated columns and then insert into that view instead? I haven't tested it but it may work