microsoft / sql-spark-connector

Apache Spark Connector for SQL Server and Azure SQL
Apache License 2.0
273 stars 118 forks source link

SQL server type conversion error during bulkCopy #201

Open hiumingw opened 2 years ago

hiumingw commented 2 years ago

In our current migration effort to spark-connector new version, we noticed there are some type conversion issues after using spark-mssql-connector ver 1.0.2 for BulkCopy. I have created two simple unit test to reproduce this issue by calling JDBC BulkCopy directly and calling through Spark-connector. DB column types stay intact with the JDBC test but changed with the Spark-connector. Below are the steps to reproduce this issue with Spark-connector

  1. Create sql table with Bit and SmallDateTime column

    CREATE TABLE TypeTable
    ( 
                varchar_type varchar(10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
                bit_type Bit NOT NULL,
                smallDate_type SmallDateTime NOT NULL
    )
  2. Run below unit test to insert with Spark connector … create spark session

     val testData = Seq(
        Row("88888", 111, Timestamp.valueOf("1997-12-31 23:59:00.0")),
        Row("52803-2459", 1, Timestamp.valueOf("1997-12-31 23:59:00.0")),
        Row(" ", 0, Timestamp.valueOf("1997-12-31 23:59:00.0"))
      )
    
      val testSchema = List(
        StructField("varchar_type", StringType, true),
        StructField("bit_type", IntegerType, true),
        StructField("smallDate_type", TimestampType, true)
      )
    
      val testDF = spark.createDataFrame(
        spark.sparkContext.parallelize(testData),
        StructType(testSchema)
      )
    
      val server_name = "jdbc:sqlserver://xxx"
      val database_name = "Platform1"
      val url = server_name + ";" + "databaseName=" + database_name + ";"
    
      testDF.write
        .format("com.microsoft.sqlserver.jdbc.spark")
        .mode("overwrite")
        .option("url", url)
        .option("dbtable", "dbo.TypeTable")
        .option("user", "xxx")
        .option("password", "xxx")
        .option("batchSize", "250000")
        .option("tableLock", "true")
        .option("bulkCopyTimeout", "3000")
        .save()
  3. Check the table type. It is now changed. (Bit type -> Integer, SmallDateTime->DateTime), We don’t see this behavior when calling JDBC BulkCopy directly

        TypeTable       varchar_type   SQL_Latin1_General_CP1_CI_AS    nvarchar
        TypeTable       bit_type           NULL   int
        TypeTable       smallDate_type           NULL   datetime
    
        Before the test ran, the table schema was 
        TypeTable       varchar_type   SQL_Latin1_General_CP1_CI_AS    varchar
        TypeTable       bit_type           NULL   bit
        TypeTable       smallDate_type           NULL   smalldatetime
moredatapls commented 2 years ago

I think this happens because you chose overwrite when writing the table. As per the documentation here, overwrite will first drop the table and then create it again:

Important: using the overwrite mode will first DROP the table if it already exists in the database by default. Please use this option with due care to avoid unexpected data loss!

It does this using the schema in your data frame using the mappings here and here, which explains why the schema changes:

There are two options for you to fix this:

val testSchema = List(
  StructField("varchar_type", VarcharType(12), true), // put your length here
  StructField("bit_type", BooleanType, true),
  StructField("smallDate_type", TimestampType, true) // I don't think that spark supports smalldatetime
)
hiumingw commented 2 years ago

@moredatapls, really appreciate your input, you are right on.

I tried setting option("truncate", true) and got errors on getColMetaData. I see it is because the checkSchema flag is hard-coded to true. I actually tested writing the same data by directly calling JDBC bulkCopy, the driver is able to convert Integer to Bit and TimeStamp to SmallDataTime even the dataframe type and database type are different. I wonder if we should add a third option to allow truncate and skipping checkSchema (with an option with default being true) to allow underlying JDBC driver perform conversions?

In Connector.scala

             case SaveMode.Overwrite =>
                if(options.isTruncate) {
                  logInfo(s"Overwriting with truncate for table '${options.dbtable}'")
                  val colMetaData = getColMetaData(df, conn, sqlContext, options,  true)
                  mssqlTruncateTable(conn, options.dbtable)
                  writeInParallel(df, colMetaData, options, sqlContext.sparkContext.applicationId)
                }
moredatapls commented 2 years ago

@hiumingw did you set .option("schemaCheckEnabled", false) when writing the data?

hiumingw commented 2 years ago

Thanks @moredatapls for the pointer. I didn't see this option earlier. After I added the option, it went a little further but got a writing error on type conversion. I am checking JDBC code.

moredatapls commented 2 years ago

What exactly is the error that you're getting? Just out of curiosity

hiumingw commented 2 years ago

@moredatapls yeah, it is failing here SQLBulkCopy.java wherein casting (Boolean)colValue

            case java.sql.Types.BIT:
                if (null == colValue) {
                    writeNullToTdsWriter(tdsWriter, bulkJdbcType, isStreaming);
                } else {
                    if (bulkNullable) {
                        tdsWriter.writeByte((byte) 0x01);
                    }
                    tdsWriter.writeByte((byte) ((Boolean) colValue ? 1 : 0));
                }  

it appears DataFrameBulkRecord set the bulkJDBCtype to match the BIT type in sql table, it should only be setting destination type. If bulkJDBCType is set to integer and destType is BIT, below code will get executed and should work properly

      // We are sending the data using JDBCType and not using SSType as SQL Server will automatically do the
                  // conversion.
                  switch (bulkJdbcType) {
                      case java.sql.Types.INTEGER:
                          if (null == colValue) {
                              writeNullToTdsWriter(tdsWriter, bulkJdbcType, isStreaming);
                          } else {
                              if (bulkNullable) {
                                  tdsWriter.writeByte((byte) 0x04);
                              }
                              tdsWriter.writeInt((int) colValue);
                          }
                          break;
shanyu commented 2 years ago

Since the dest table column type is bit, should you use BooleanType in your Spark code? StructField("bit_type", IntegerType, true)

hiumingw commented 2 years ago

Thanks @shanyu for your comments. In most case, we have the data types matched in both dataframe and sql schema, but there are some cases where the dataframe have different types, this is expected in the data processing world and we rely on data conversion by JDBC driver.

Having said that, the JDBC driver used by spark-connector is able to narrow casting when bulkCopy is called. We have created a bulkcopy test case for this as well and can confirm the narrow casting is working as expected. i.e, if integer are bulkcopied to a column with BIT column type, 1 will be inserted for non-zero values and 0 for zero, same goes for timestamp values for SmallDateTime column type

    try {
            val connection: Connection = DriverManager.getConnection(connectionUrl)

            val csvPath = "C:/jdbc/test.txt"
            val fileRecord = new SQLServerBulkCSVFileRecord(csvPath, null, "\\|", false)
            fileRecord.addColumnMetadata(1, null, java.sql.Types.NVARCHAR, 50, 0)
            fileRecord.addColumnMetadata(2, null, java.sql.Types.INTEGER, 50, 0)
            fileRecord.addColumnMetadata(3, null, java.sql.Types.TIMESTAMP, 50, 0)

            val bulkCopy = new SQLServerBulkCopy(connection)
            bulkCopy.setDestinationTableName("dbo.TypeTable")
            bulkCopy.writeToServer(fileRecord)

          }
shanyu commented 2 years ago

Hi @hiumingw, can you please try using the Spark JDBC connector? Our sql-spark-connector is based on the Spark JDBC connector, so the behavior might be imposed by Spark. https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

If the data type doesn't match, you could cast type integer to boolean with the DataFrame before calling into our connector.

hiumingw commented 2 years ago

Thanks Shanyu. I am using the spark-connector. The issue is, it appears inconsistency in spark connector logic in setting the jdbcType and destSStype.

As last resort, we might have to find all the columns with type difference and add code to explictly cast them in many of our datasets, but i am hoping the spark connector library can address this at a lower level since automatic type conversion is supported by JDBC driver.

shanyu commented 2 years ago

If spark JDBC connector behave the same as sql-spark-connector, there is nothing we could/should do. Either you raise an issue with spark JDBC connector in the Spark code base, or convert column types in the dataframe.

hiumingw commented 2 years ago

spark JDBC connector doesn't support SQL server bulkCopy.

luxu1-ms commented 2 years ago

Spark JDBC will convert the column to dest table datatype so that it works, (If you try below with append mode, the dest table would not be dropped so that the datatypes of the result are the same with sql table). But BulkCopy that Spark MS Sql connector is using matches the origin and dest datatypes, this would cause error.

df.write \
    .format("jdbc") \
    .mode("append") \
   ...
hiumingw commented 2 years ago

yeah, I think the question is why spark MS connector set the source type to match the dest datatype even schemaCheckEnabled is false. JDBC driver will do the proper conversion if the source type left untouched.

luxu1-ms commented 2 years ago

The original issue ("table column types changes after BulkCopy") is because the mode used is "overwrite" instead of "append". "overwrite" without "isTruncate=true" dropps the sql table and creates a new table using spark df schema.

Then the question became "why spark mssql connector match the source datatype to dest datatype?". Please note that Spark MsSql connector uses SQLServerBulkCopy to write data to SQL table. I collected the error log of one test case. It looks like the SQLServerBulkCopy.writeColumnToTdsWriter() failed with datatype conversion. Please see source code here.

2022-11-01 18:27:21,953 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (storage-0-0.storage-0-svc.test.svc.cluster.local executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: An error occurred while converting the Long value to JDBC data type BIT.
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeColumnToTdsWriter(SQLServerBulkCopy.java:2567)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeColumn(SQLServerBulkCopy.java:3069)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeBatchData(SQLServerBulkCopy.java:3633)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1566)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$300(SQLServerBulkCopy.java:64)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:662)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7375)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3206)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:696)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1654)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:619)
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.bulkWrite(BulkCopyUtils.scala:108)
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.savePartition(BulkCopyUtils.scala:56)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2(BestEffortSingleInstanceStrategy.scala:43)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2$adapted(BestEffortSingleInstanceStrategy.scala:42)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Boolean
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeColumnToTdsWriter(SQLServerBulkCopy.java:2111)
    ... 25 more
hiumingw commented 2 years ago

yes, following the stack trace above, open SQLServerBulkCopy line 2567, in the same function, you will find this comment // We are sending the data using JDBCType and not using SSType as SQL Server will automatically do the // conversion.

If the JDBCtype is set to Integer and SSType is Boolean, we won't see this error and SQL server will handle the conversion. But in the current Spark connector code, the are both set to Boolean, hence the casting error is thrown

Rafnel commented 3 weeks ago

Just wanted to confirm I am seeing this same issue:

  1. I used to use the default SQL Server JDBC connector, but it doesn't support bulk inserts so can be very inefficient (i.e. it executes one insert statement at a time even with a massive dataset)
  2. I searched online and found this connector and confirmed it supports bulk inserts into SQL Server. Great!
  3. I tried running it with my project and found that this connector requires an exact match between the DataFrame column types <-> SQL Server column types. The default Spark SQL Server JDBC connector seems to perform the casting of one type to another implicitly. i.e. I have a file with 150 columns and I read the file in with Spark, all columns are given a string type, then I write to SQL Server and the old JDBC connector would cast string -> decimal, string -> datetime, etc. I have like 80 tables with different schemas in SQL so it would be quite inconvenient to manually maintain 80 copies of the schema in Spark when it's not really necessary.

Would be nice if this connector could support the implicit casts that the default JDBC connector seems to be doing.