Azure / azure-sqldb-spark

This project provides a client library that allows Azure SQL DB or SQL Server to act as an input source or output sink for Spark jobs.
MIT License
75 stars 52 forks source link

bulkCopyToSqlDB Table Needs to Already Exist? #47

Closed furlong46 closed 4 years ago

furlong46 commented 5 years ago

Does the destination table in the bulk copy need to exist prior to the operation?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 238.0 failed 4 times, most recent failure: Lost task 3.3 in stage 238.0 (TID 2269, 00.000.00.0, executor 0): com.microsoft.sqlserver.jdbc.SQLServerException: Unable to retrieve column metadata. at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.getDestinationMetadata(SQLServerBulkCopy.java:1777) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1676) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669) at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127) at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72) at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:951) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2284) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2284) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Invalid object name 'ETL.Dim_SKU_Update6'. at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)

InterruptSpeed commented 5 years ago

when the target db table doesn't already exist it will be created when this is executed:

df.bulkCopyToSqlDB(writeConfig, createTable = true)

however any columns in the dataframe schema that are StringType will manifest as

[nvarchar](MAX)

which presents a problem if you then try to make nonclustered indexes on those fields in the db.

the solution i've landed on [still a work in progress] is to predefine the target tables in the db and then wrangle the datatypes as best as possible.

also using page compression on the tables after the fact [regardless of approach] to sort performance out a bit further.

furlong46 commented 5 years ago

@InterruptSpeed , does it still do nvarchar(max) if you specify the BulkCopyMetadata?

InterruptSpeed commented 5 years ago

thats a good question. i'll have to give it a try and see. will get back to you.

furlong46 commented 5 years ago

I tested it. It still did nvarchar(max)

InterruptSpeed commented 5 years ago

yikes. ok thanks for the follow-up.

furlong46 commented 5 years ago

@InterruptSpeed , do you know if there is a way to overwrite?

InterruptSpeed commented 5 years ago

not that I know of with this library.

originally I was using plain-old JDBC which let me truncate the table prior to load but with this one I haven't come across an option to do that. current [work in progress] game plan is to use JDBC to invoke a stored procedure that drops existing tables [and indexes] then creates empty tables all prior to doing the bulk load. then post load invoke another stored proc to build indexes and apply page compression.

ElectricLlama commented 4 years ago

How did you go with this? I'm implementing SQL call to truncate the target table first, as soon as I work out the scala incantation required to add a "queryCustom" element to the config

InterruptSpeed commented 4 years ago

my typical scala notebook uses pyodbc to invoke a sp_pre_refresh stored proc [which drops tables in order to remove the indexes as well, rather than truncate, and then recreates the tables], then the notebook does the bulk inserts, then it uses pyodbc to invike sp_post_refresh stored proc to rebuild indexes. the whole thing works reliably but sure does feel like work. in my case i'm targeting a sql server instance so the top of the notebook starts with a bash block to apt-get install the proprietary drivers pyodbc needs to connect to the sql server.

ElectricLlama commented 4 years ago

my typical scala notebook uses pyodbc to invoke a sp_pre_refresh stored proc [which drops tables in order to remove the indexes as well, rather than truncate, and then recreates the tables], then the notebook does the bulk inserts, then it uses pyodbc to invike sp_post_refresh stored proc to rebuild indexes. the whole thing works reliably but sure does feel like work. in my case i'm targeting a sql server instance so the top of the notebook starts with a bash block to apt-get install the proprietary drivers pyodbc needs to connect to the sql server.

It does seem like a lot of work but it's not too different from standard ELT process of drooping indexes, loading, recreating, although you could probably disable and rebuild, and then at least you don't need to preserve that actual index creation script with columns and stuff. You could even cycle through metadata to do it.

arvindshmicrosoft commented 4 years ago

I think there are reasonable suggestions on this thread from the community. Thank you all for contributing your advice. Closing this issue.