Closed ChetanKardekar closed 3 years ago
hi @ChetanKardekar
Looking at the doc for postActions it specifies that it is "A ; separated list of SQL commands..." it seems that the ;
is required?
Have you tried this with another driver (e.g. .NET) to see if you get the same result?
closing due to inactivity, please request to re-open issue if necessary
Me and my team are having the exact same issue. The SQL statement that's executed by the JDBC connector works when using SSMS. Maybe it's due to the fact that the MERGE statement is still a preview for Azure Synapse ?
I ran into the same issue, merge statement works fine in Synapse studio, but it does not work with databricks spark connector writing to Synapse as a postaction.
df.write \ .format("com.databricks.spark.sqldw") \ .option("url", synapse_JDBCURL) \ .option("user", dbutils.secrets.get(scope = secret_Scope, key = secret_SQLDBUser)) \ .option("password", dbutils.secrets.get(scope = secret_Scope, key = secret_SQLDBPasswd)) \ .option("tempDir", "abfss://" + file_System + "@" + storage_Account + ".dfs.core.windows.net/" + write_Staging_Folder) \ .option("forwardSparkAzureStorageCredentials", "true") \ .option("dbTable", "wwi_staging.SaleSmall") \ .option("postActions", "MERGE wwi.SaleSmall AS tgt USING (SELECT try_cast(TransactionId as uniqueidentifier) as TransactionId,CustomerId,ProductId,sum(try_cast(Quantity as tinyint)) as Quantity,sum(Price) as Price, sum(TotalAmount) as TotalAmount, TransactionDate as TransactionDateId, sum(ProfitAmount) as ProfitAmount, Hour, Minute,StoreId FROM wwi_staging.SaleSmall group by TransactionDate, StoreId, CustomerId, ProductId, TransactionId, Hour, Minute ) AS src ON (tgt.TransactionId = src.TransactionId) AND (tgt.CustomerId = src.CustomerId) AND (tgt.ProductId = src.ProductId) AND (tgt.TransactionDateId = src.TransactionDateId) AND (tgt.StoreId = src.StoreId) AND (tgt.Hour = src.Hour) AND (tgt.Minute = src.Minute) WHEN MATCHED AND tgt.Quantity <> src.Quantity THEN UPDATE SET tgt.Quantity = src.Quantity, tgt.ProfitAmount = src.ProfitAmount, tgt.TotalAmount = src.TotalAmount; ") \ .mode("overwrite").save()
Py4JJavaError Traceback (most recent call last)
My workaround for that was creating a Stored Procedure that would execute the MERGE statement. Hacky, but worked.
Thank you @jelther, stored procedure worked!
Problem description
The Synapse connector uses the mssql driver to connect to Synapse and issue SQL commands Azure Synapse connector provides a feature called postActions. Post action is any SQL command which will run immediately after the write is completed. The below example explains how to use postAction
df.write \ .format("com.databricks.spark.sqldw") \ .option("url", "jdbc:sqlserver://<>;database=<<>DATABASE>;user=<>;password=<>;encrypt=true;trustServerCertificate=false;loginTimeout=30") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "nameTab") \
.mode("overwrite")\
.option('maxStrLength',4000) \
.option('postActions', postActions) \
.option("tempDir", "abfss://<>/tmp") \
.save()
Post action is working for all the other SQL commands like select, insert, update etc but it is not working for the Merge command like below
postActions = "MERGE into nameTab_target AS target USING nameTab AS source ON target.ID = source.ID WHEN MATCHED THEN UPDATE SET target.name = source.name"
Expected behaviour: merge command should work in post action
Actual behaviour: Merge command not working and failing with below error
Error message/stack trace:
Py4JJavaError: An error occurred while calling o1058.save. : com.databricks.spark.sqldw.SqlDWSideException: Azure Synapse Analytics failed to execute the JDBC query produced by the connector. Underlying SQLException(s):
com.microsoft.sqlserver.jdbc.SQLServerException: Parse error at line: 1, column: 141: Incorrect syntax near 'name'. [ErrorCode = 103010] [SQLState = S0001]
at com.databricks.spark.sqldw.Utils$.wrapExceptions(Utils.scala:441) at com.databricks.spark.sqldw.DefaultSource.createRelation(DefaultSource.scala:86) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:91) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$3(SparkPlan.scala:252) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:157) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:999) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:836) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:199) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:999) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:437) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:421) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.SQLException: Exception thrown in awaitResult: at com.databricks.spark.sqldw.JDBCWrapper.executeInterruptibly(SqlDWJDBCWrapper.scala:135) at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$1(SqlDWJDBCWrapper.scala:113) at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$1$adapted(SqlDWJDBCWrapper.scala:113) at com.databricks.spark.sqldw.JDBCWrapper.withPreparedStatement(SqlDWJDBCWrapper.scala:307) at com.databricks.spark.sqldw.JDBCWrapper.executeInterruptibly(SqlDWJDBCWrapper.scala:113) at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW$12(SqlDwWriter.scala:275) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377) at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363) at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34) at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW$11(SqlDwWriter.scala:275) at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW$11$adapted(SqlDwWriter.scala:268) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW$1(SqlDwWriter.scala:268) at com.databricks.spark.sqldw.SqlDwWriter.$anonfun$saveToSqlDW$1$adapted(SqlDwWriter.scala:72) at com.databricks.spark.sqldw.JDBCWrapper.withConnection(SqlDWJDBCWrapper.scala:285) at com.databricks.spark.sqldw.SqlDwWriter.saveToSqlDW(SqlDwWriter.scala:72) at com.databricks.spark.sqldw.DefaultSource.$anonfun$createRelation$3(DefaultSource.scala:118) at com.databricks.spark.sqldw.Utils$.wrapExceptions(Utils.scala:410) ... 33 more Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Parse error at line: 1, column: 141: Incorrect syntax near 'name'. at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1632) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.doExecutePreparedStatement(SQLServerPreparedStatement.java:600) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement$PrepStmtExecCmd.doExecute(SQLServerPreparedStatement.java:522) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7225) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3053) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:247) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:222) at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.execute(SQLServerPreparedStatement.java:503) at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$2(SqlDWJDBCWrapper.scala:113) at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$2$adapted(SqlDWJDBCWrapper.scala:113) at com.databricks.spark.sqldw.JDBCWrapper.$anonfun$executeInterruptibly$3(SqlDWJDBCWrapper.scala:127) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Reproduction code
import java.sql.{Connection, DriverManager}
var conn: Connection = null try { conn = DriverManager.getConnection("jdbc:sqlserver://...") val query = "MERGE into nameTab_target AS target USING nameTab AS source ON target.ID = source.ID WHEN MATCHED THEN UPDATE SET target.name = source.name;" val statement = conn.prepareStatement(query) try { statement.execute() } finally { statement.close() } } finally { if (conn != null) { conn.close() } }
we use 8.2.2.jre8 SQL Server driver to connect to Synapse. The problem seems to be in the driver that does not handle MERGE command correctly without the ending ;.
You can use the following code to create tables and insert data in SQL DW: create table nameTab_target(ID int, name varchar(128)); insert into nameTab_target values (1, '?'); insert into nameTab_target values (2, '?'); insert into nameTab_target values (3, '?'); insert into nameTab_target values (4, '?');
create table nameTab(ID int, name varchar(128)); insert into nameTab values (1, 'A'); insert into nameTab values (2, 'B');
If you remove ; at the end of the MERGE statement, this code will fail with com.microsoft.sqlserver.jdbc.SQLServerException: Parse error at line: 1, column: 140: Incorrect syntax near 'name'..
Note that other statements work correctly, e.g. both INSERT INTO nameTab_target values (6, '?'); and INSERT INTO nameTab_target values (6, '?') work fine.
The reason this would still fail is splitting by ; in the Synapse connector code, so any semicolons at the end of a statement will be removed.