Azure / azure-sqldb-spark

This project provides a client library that allows Azure SQL DB or SQL Server to act as an input source or output sink for Spark jobs.
MIT License
75 stars 52 forks source link

bulkCopyToSqlDB - Datatype mapping UniqueIdentifier #86

Closed jwinchellavtex closed 4 years ago

jwinchellavtex commented 4 years ago

We are trying to use the bulkCopyToSqlDB function within Azure Databricks. Our destination staging tables consist of fields that are of type UniqueIdentifier (aka Guid). We have been unable to map any data types to those columns. We've tried:

In all cases we continue to get "Unable to convert characters string to unique identifier.

Here is the code we are using (I Have commented it out in our notebook, hence the commented out tags here). ParticipantId,SessionId and ProtocolCallId are all uniqueidentifier fields.
`%scala

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata import com.microsoft.azure.sqldb.spark.config.Config import com.microsoft.azure.sqldb.spark.connect._

var bulkCopyMetadata = new BulkCopyMetadata bulkCopyMetadata.addColumnMetadata(1, 'ParticipantId', java.sql.Types.VARCHAR, 255, 0) bulkCopyMetadata.addColumnMetadata(2, 'SessionId', java.sql.Types.VARCHAR, 255, 0) bulkCopyMetadata.addColumnMetadata(3, 'MediaTypeCode', java.sql.Types.INTEGER, 128, 0) bulkCopyMetadata.addColumnMetadata(4, 'DirectionCode', java.sql.Types.INTEGER, 128, 0) bulkCopyMetadata.addColumnMetadata(5, 'ANI', java.sql.Types.NVARCHAR, 150, 0) bulkCopyMetadata.addColumnMetadata(6, 'DNIS', java.sql.Types.NVARCHAR, 150, 0) bulkCopyMetadata.addColumnMetadata(7, 'SessionDNIS', java.sql.Types.NVARCHAR, 150, 0) bulkCopyMetadata.addColumnMetadata(8, 'EdgeId', java.sql.Types.VARCHAR, 255, 0) bulkCopyMetadata.addColumnMetadata(9, 'PeerId', java.sql.Types.VARCHAR, 255, 0) bulkCopyMetadata.addColumnMetadata(10, 'ProtocolCallId', java.sql.Types.VARCHAR, 255, 0) bulkCopyMetadata.addColumnMetadata(11, 'ProviderCode', java.sql.Types.INTEGER, 128, 0) bulkCopyMetadata.addColumnMetadata(12, 'Remote', java.sql.Types.NVARCHAR, 150, 0) bulkCopyMetadata.addColumnMetadata(13, 'IsRecording', java.sql.Types.INTEGER, 128, 0)

val bulkCopyConfig = Config(Map( 'url' -> 'avtex-purecloud-dbserver.database.windows.net', 'databaseName' -> '1043-EFG-Production', 'user' -> 'jwinchell', 'password' -> 'Pass@Word01', 'dbTable' -> 'dbo.Stg_Participant_Sessions', 'bulkCopyBatchSize' -> '2500', 'bulkCopyTableLock' -> 'false', 'bulkCopyTimeout' -> '600' ))

spark.table('Stg_Participant_Sessions').bulkCopyToSqlDB(bulkCopyConfig,bulkCopyMetadata)`

arvindshmicrosoft commented 4 years ago

Firstly, another reminder that his project is not being actively maintained. Instead, Apache Spark Connector for SQL Server and Azure SQL is now available, with support for Python and R bindings, an easier-to use interface to bulk insert data, and many other improvements. This repo will soon be archived (set to read-only). We encourage you to actively evaluate and use the new connector.

Before closing this issue, I wanted to share whatever I can on this question. I can cleanly insert GUIDs from Spark to SQL using the example below. I suggest to explore this and see if it helps. First, the sample table schema:

create table guids
(
    id uniqueidentifier,
    j int
)

Then, the construction of a test dataframe in Spark to insert into the database:

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType

val uuid = udf(() => java.util.UUID.randomUUID().toString)

val schema = StructType(
  StructField("id", StringType, true) ::
  StructField("j", LongType, false) :: Nil)

val df = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)).toDF("j")
val df_with_guid = df.withColumn("id", uuid())
display(df_with_guid.select("id", "j"))

At this stage, the id column in the dataframe is a string, and has values like E5B6B84B-AE60-4473-9D60-0503093F7969. Let's proceed to map these to the bulk copy and then insert into SQL:

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import org.apache.spark.sql.functions._

var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "id", java.sql.Types.VARCHAR, 36, 0)
bulkCopyMetadata.addColumnMetadata(2, "j", java.sql.Types.INTEGER, 8, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "SOMESERVER",
  "databaseName"      -> "SOMEDB",
  "user"              -> "SOMEUSER",
  "password"          -> "SOMEPASSWORD",
  "dbTable"           -> "dbo.guids",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df_with_guid.select("id", "j").limit(100).bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)

Then from SQL to cross-check the outcome:

select * from guids

id                                   j
------------------------------------ -----------
DA782BD3-D6C3-4093-8050-467458A2B31B 2
9AE0EEFD-8501-416E-811A-659315521AC6 4
43BD0CE5-70C9-4823-879B-996ECD4234B2 5
E5B6B84B-AE60-4473-9D60-0503093F7969 7
27446CB3-3065-4860-AD25-F7248D69A217 9
EAD9F014-B3F8-4394-B681-DC6D2BE34223 10
EAAFF644-E6E2-46A8-B922-116FDAB900D2 1
F57B72E9-70D9-46D4-8887-DCDAD1E1BB16 3
15283C65-E3FD-461D-95F8-3B1E5F4B54F9 6
FFA4E2CC-3B46-499B-96B8-5F02F441ECC1 8

(11 rows affected)