microsoft / sql-spark-connector

Apache Spark Connector for SQL Server and Azure SQL
Apache License 2.0
274 stars 116 forks source link

Not able to insert dataframe without identity column #174

Closed sumitppawar closed 2 years ago

sumitppawar commented 2 years ago

For identity column, in our case it should be from database

  1. If identity columns is missing in dataframe, getting below error Key not found c1
    1. Tried by adding identity column with dummy value, but getting below error at sqlserver jdbc driver
      com.microsoft.sqlserver.jdbc.SQLServerException: An explicit value for the identity column in table 'test_table' can only be specified when a column list is used and IDENTITY_INSERT is ON.

Using com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 with Spark 3.0.1 and SQL server 2016. (schemaCheckEnabled is false andreliabilityLevel is NO_DUPLICATES )

How to reproduce it ? Code snippet SQL Table

-- test_table
CREATE TABLE test_table(
c1 bigint not null IDENTITY(1,1) ,
c2 Varchar(200)
);

Java

        Dataset<Row> ds = spark
                .range(10000)
                .toDF("c2")
                .withColumn("c1", org.apache.spark.sql.functions.lit(0));

        ds.write()
                .format("com.microsoft.sqlserver.jdbc.spark")
                .mode(SaveMode.Overwrite)
                .option("truncate", true)
                .option("reliabilityLevel", "NO_DUPLICATES")
                .option("url", "jdbc_url")
                .option("dbtable", "test_table")
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .option("schemaCheckEnabled", "false")
                .option("tableLock", "true")
                .save();

Observations

  1. Connector is not throwing key not found error, as there is identity key
  2. As identity column is in dataframe, ReliableSingleInstanceStrategy is trying to insert from staging table along with identity column (Final query)
    INSERT INTO test_table WITH (TABLOCK) 
    SELECT * from [##local-1644498591877_test_table_0] 
    UNION ALL 
    SELECT * from [##local-1644498591877_test_table_1] 
    UNION ALL 
    SELECT * from [##local-1644498591877_test_table_2]
    • In above case, all staging table will have its own identity column value.
    • Even after enabling SET IDENTITY_INSERT test_table ON, value of identity column will be wrong and eventually will get constrain error

Proposed solution

For ReliableSingleInstanceStrategy, add new property IdentityInsert with possible values ON and OFF. If IdentityInsert is OFF

  1. Remove identity column check
  2. Select only non identity columns from staging table instead of *
TingtingH2021 commented 2 years ago

I'm running into the same problem with default reliability model BEST_EFFORT

Using com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 with Spark 3.0.1 and SQL server 2016. (schemaCheckEnabled is false andreliabilityLevel is NO_DUPLICATES )

Code snippet

def write(writer : DataFrameWriter[Row], tableName : String){
    val authenticatedWriter = credential match {
        case x : AzureActiveDirectoryApplicationAccount => authenticate(writer, x)
        case x : SQLAccount => authenticate(writer, x)
        case x => throw UnsupportedCredentialType(x.getClass.getName)
    }

    authenticatedWriter
        .format("com.microsoft.sqlserver.jdbc.spark")
        .option("url", jdbcUrl)
        .option("dbtable", tableName)
        .option("tableLock", "true")
        .option("schemaCheckEnabled", false)
        .option("hostNameInCertificate", "*.database.windows.net")
        .save()
}

Error message: 22/03/01 21:59:49 ERROR ApplicationMaster: User class threw exception: java.util.NoSuchElementException: key not found: rowid java.util.NoSuchElementException: key not found: rowid at scala.collection.MapLike.default(MapLike.scala:235) at scala.collection.MapLike.default$(MapLike.scala:234) at scala.collection.AbstractMap.default(Map.scala:63) at scala.collection.MapLike.apply(MapLike.scala:144) at scala.collection.MapLike.apply$(MapLike.scala:143) at scala.collection.AbstractMap.apply(Map.scala:63) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.$anonfun$matchSchemas$2(BulkCopyUtils.scala:343) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.matchSchemas(BulkCopyUtils.scala:327) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.getColMetaData(BulkCopyUtils.scala:266)

The exclusion of Identity column check may not limit to ReliableSingleInstanceStrategy but also in matchSchemas, change, add parmeter in GetComputedCol, if schemaCheckEnabled is false, then the columns names fetched should exclude Identity columns

val computedCols = getComputedCols(conn, dbtable)

sumitppawar commented 2 years ago

@TingtingH2021

Even after disabling identity column check in matchSchemas, it will fail while writing to db (because SQL sever is throwing error and asking to SET IDENTITY_INSERT test_table ON).

In ReliableSingleInstanceStrategy, connector is creating multiple staging tables and finally doing insert to main table by doing union of all staging tables. Here also we need drop identity column before doing union insert to final table.( currently union insert is like INSERT INTO test_table WITH (TABLOCK) SELECT * from [##local-1644498591877_test_table_0] UNION ALL SELECT * from [##local-1644498591877_test_table_1] UNION ALL SELECT * from [##local-1644498591877_test_table_2] Need to remove * from above query.

We also need to set SET IDENTITY_INSERT test_table ON while writing to all staging table and main table if want to use identity columns from Dataframe itself .

luxu1-ms commented 2 years ago

The work around is to have one more column in df which has same name with the identity column.

luxu1-ms commented 2 years ago

Close as same issue with #123

psolomin commented 1 year ago

@sumitppawar

Even after enabling SET IDENTITY_INSERT test_table ON

What's the syntax to do SET IDENTITY_INSERT test_table ON when I specify the connection for Spark? Is there a connection string URL param for this?

            create or replace temporary view dest
            using com.microsoft.sqlserver.jdbc.spark
            options (
              'url' '{sql_url}',
              'dbtable' 'test_table',
              'user' '{sql_user}',
              'password' '{sql_password}',
              ???????
            );

            insert into dest
            select /*+ repartition({repartition_cnt}) */
              *
            from ({load_from_select_hql_template}) as source_t;