microsoft / sql-spark-connector

Apache Spark Connector for SQL Server and Azure SQL
Apache License 2.0
273 stars 116 forks source link

Error with reliabilityLevel = NO_DUPLICATES and sequence in primary key #230

Open alexey-abrosin opened 1 year ago

alexey-abrosin commented 1 year ago

Hello guys. I bumped into a bug during writing to a table with both sequance as a primary key and reliabilityLevel = NO_DUPLICATES option. Steps to reproduce:

drop table if exists stg.test_table; create table stg.test_table ( pk int not null primary key clustered default (next value for stg.test_table_seq), some_long_value int not null unique with (ignore_dup_key = on) );

- Build and run spark application

import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import java.util

object Main { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName("TestApp") .getOrCreate()

val schema: StructType = StructType(
  Seq(
    StructField("pk", IntegerType),
    StructField("some_long_value", IntegerType)
  )
)

val rows: java.util.List[Row] = new util.LinkedList[Row]()
rows.add(Row(null, 1000))
rows.add(Row(null, 1001))
rows.add(Row(null, 1003))
rows.add(Row(null, 1003))
rows.add(Row(null, 1004))
rows.add(Row(null, 1004))
rows.add(Row(null, 1005))
val df: DataFrame = spark.createDataFrame(rows, schema)

df.write
  .format("com.microsoft.sqlserver.jdbc.spark")
  .mode(SaveMode.Append)
  .option("url", "jdbc:sqlserver://localhost:1433;database=MSSQLDB")
  .option("dbtable", "stg.test_table")
  .option("user", "admin")
  .option("password", "qwerty123")
  .option("truncate", "true")
  .option("batchsize", "10000")
  .option("reliabilityLevel", "NO_DUPLICATES")
  .option("tableLock", "true")
  .option("schemaCheckEnabled", "false")
  .save()

} }

- Get errors

23/07/03 13:18:53 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4) (os-2912.ajax.io executor 3): com.microsoft.sqlserver.jdbc.SQLServerException: Cannot insert the value NULL into column 'pk', table 'tempdb.dbo.##application_1670426294525_70004_stg.test_table_4'; column does not allow nulls. INSERT fails. 23/07/03 13:18:53 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3) (os-2913.ajax.io executor 2): com.microsoft.sqlserver.jdbc.SQLServerException: Cannot insert the value NULL into column 'pk', table 'tempdb.dbo.##application_1670426294525_70004_stg.test_table_3'; column does not allow nulls. INSERT fails. 23/07/03 13:18:53 WARN TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2) (os-2910.ajax.io executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: Cannot insert the value NULL into column 'pk', table 'tempdb.dbo.##application_1670426294525_70004_stg.test_table_2'; column does not allow nulls. INSERT fails. ...


If I use `BEST_EFFORT` instead of `NO_DUPLICATES` then the app works like a charm and fills the target table

select * from stg.test_table; +--+---------------+ |pk|some_long_value| +--+---------------+ |1 |1003 | |2 |1004 | |4 |1000 | |6 |1005 | |7 |1001 | +--+---------------+



My env:
- Microsoft SQL Server 2019 (RTM) - 15.0.2000.5 (X64) 
  Sep 24 2019 13:48:23 
  Copyright (C) 2019 Microsoft Corporation
  Developer Edition (64-bit) on Windows Server 2019 Standard 10.0 <X64> (Build 17763: )
- Spark 3.2.1
- Scala 2.12.15
- spark-mssql-connector 1.2.0