microsoft / sql-spark-connector

Apache Spark Connector for SQL Server and Azure SQL
Apache License 2.0
273 stars 116 forks source link

TIMESTAMP to datetime2 fails on Azure Synapse #246

Open m4rcs opened 9 months ago

m4rcs commented 9 months ago

I run into a problem when writing TIMESTAMP data from Databricks to Azure Synapse datetime2 columns using this driver. I tested it with the driver for Spark 3.1, 3.3 and 3.4. In every environment it fails with this error message:

Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16, Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 1. | Error calling: pConn->Done() | state: FFFF, number: 8861, active connections: 28

Further down I provide the complete error message.

Can you help me get this fixed? Did I miss some configuration?

Test Setup

This is the minimal example that I created to help debugging it. It's just one column and row that contains one timestamp that should be exported to a Synapse table that contains only a datetime2 column.

Databricks Notebook

from msal import ConfidentialClientApplication

# Get Access Token
kv_scope = "<redacted>"
kv_key = "<redacted>"
kv_username = "<redacted>"

service_principal_secret = dbutils.secrets.get(scope=kv_scope, key=kv_key)
app = ConfidentialClientApplication(
    kv_username,
    service_principal_secret,
    "<redacted>",
)
result = app.acquire_token_for_client(scopes=["https://database.windows.net/.default"])
access_token = result["access_token"]

df = spark.sql("select current_timestamp() as timestamp")

df.write.format("com.microsoft.sqlserver.jdbc.spark").mode("append").option(
    "url",
    "<url to synapse>",
).option(
    "dbtable", "dbt.d_test_t"
).option(
    "accessToken", access_token
).option(
    "mssqlIsolationLevel", "READ_UNCOMMITTED"
).save()

dbt.d_test_t Schema in Synapse

CREATE TABLE [dbt].[d_test_t]
(
    [timestamp] [datetime2](7)  NOT NULL
)

Error Message

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-573169803774740> in <cell line: 20>()
     18
     19
---> 20 df.write.format("com.microsoft.sqlserver.jdbc.spark").mode("append").option(
     21     "url",
     22     "<redacted>",

/databricks/spark/python/pyspark/instrumentation_utils.py in wrapper(*args, **kwargs)
     46             start = time.perf_counter()
     47             try:
---> 48                 res = func(*args, **kwargs)
     49                 logger.log_success(
     50                     module_name, class_name, function_name, time.perf_counter() - start, signature

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    964             self.format(format)
    965         if path is None:
--> 966             self._jwrite.save()
    967         else:
    968             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    194     def deco(*a: Any, **kw: Any) -> Any:
    195         try:
--> 196             return f(*a, **kw)
    197         except Py4JJavaError as e:
    198             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o462.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7) (10.136.172.119 executor 0): com.microsoft.sqlserver.jdbc.SQLServerException: 110802;An internal DMS error occurred that caused this operation to fail. Details: Please use this Error ID when contacting your Administrator for assistance. EID:(172b394523f94ac3aeaa183dd70632cc)
SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16,  Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 1. | Error calling: pConn->Done() | state: FFFF, number: 8861, active connections: 28', Connection String: Driver={pdwodbc17e};app=TypeD00-DmsNativeWriter:DB00000V\mpdwsvc (64100)-ODBC;autotranslate=no;trusted_connection=yes;server=\\.\pipe\_DB_31-fb745f61d208--0\sql\query;database=Distribution_25
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:283)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:129)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:37)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:26)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1589)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$300(SQLServerBulkCopy.java:65)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:663)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7418)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3272)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:697)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1654)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:620)
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.bulkWrite(BulkCopyUtils.scala:110)
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.savePartition(BulkCopyUtils.scala:58)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2(BestEffortSingleInstanceStrategy.scala:43)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2$adapted(BestEffortSingleInstanceStrategy.scala:42)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1051)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1051)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2786)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
    at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3381)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3313)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3304)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3304)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1428)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1428)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1428)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3593)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3531)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3519)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1177)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1165)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2746)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2729)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2767)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2786)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2811)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1051)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:446)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1049)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.write(BestEffortSingleInstanceStrategy.scala:42)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceConnector$.writeInParallel(SingleInstanceConnector.scala:35)
    at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:80)
    at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:66)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:229)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:249)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:399)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:194)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:148)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:349)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:229)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:214)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:227)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:220)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:99)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:298)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:294)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:220)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:354)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:220)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:174)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:165)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:256)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:397)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:259)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 110802;An internal DMS error occurred that caused this operation to fail. Details: Please use this Error ID when contacting your Administrator for assistance. EID:(172b394523f94ac3aeaa183dd70632cc)
SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16,  Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 1. | Error calling: pConn->Done() | state: FFFF, number: 8861, active connections: 28', Connection String: Driver={pdwodbc17e};app=TypeD00-DmsNativeWriter:DB00000V\mpdwsvc (64100)-ODBC;autotranslate=no;trusted_connection=yes;server=\\.\pipe\_DB_31-fb745f61d208--0\sql\query;database=Distribution_25
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:283)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:129)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:37)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:26)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1589)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$300(SQLServerBulkCopy.java:65)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:663)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7418)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3272)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:697)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1654)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:620)
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.bulkWrite(BulkCopyUtils.scala:110)
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.savePartition(BulkCopyUtils.scala:58)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2(BestEffortSingleInstanceStrategy.scala:43)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2$adapted(BestEffortSingleInstanceStrategy.scala:42)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1051)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1051)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2786)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
    at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
michaelpeele-fei commented 9 months ago

I'm experiencing this same issue/error with a Standard SQL Server and datetime2(0) only, other precisions work as normal. Changing the data type to datetime works around the issue, but appears this is missing in the bulk driver for multiple connection types