apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.46k stars 2.23k forks source link

Unable to merge CDC data into snapshot data. java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long #8333

Open chandu-1101 opened 1 year ago

chandu-1101 commented 1 year ago

Apache Iceberg version

1.3.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

  1. I created a iceberg table from existing parquet file
  2. Then I am trying to insert cdc data into it and i get the below error.

--> I am not sure if this issue is because of bad data? If yes then how doi debug the issue?

emr: 6.9.0 spark: 3.3.0 scala: 2_12

spark shell command

spark-shell --driver-memory 1g --executor-memory 1g --executor-cores 1 --driver-cores 1 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true --conf spark.sql.adaptive.coalescePartitions.minPartitionNum=1 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.yarn.maxAppAttempts=1 --conf spark.yarn.maxAppAttempts=1   --conf spark.yarn.submit.waitAppCompletion=false  --files /home/hadoop/jars/log4j2.properties --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop  --conf spark.sql.catalog.local.warehouse=$PWD/warehouse  --conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" --name ravic  --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1  --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar

Exception::

ob aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 2711) (ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long+details
Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 2711) (ip-172-25-26-205.prod.phenom.local executor 23): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:95)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_32_62$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.addMetadataColumnsIfNeeded(FileScanRDD.scala:291)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:318)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:138)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:

code

  val sess = Application.spark();
    val snapshotDf = sess.read.parquet("s3://bucket/snapshots2/ge11-partitioned/")
    // val _snapshotDf = snapshotDf.sortWithinPartitions("__created_date_")
    snapshotDf.createOrReplaceTempView("snapshot")
    sess.sql(""" CREATE TABLE ge112 
                  USING iceberg 
                  TBLPROPERTIES ('key'='_id.oid') 
                  location 's3://bucket/snapshots2/ge11-ice2/'  
                  as 
                    select * from snapshot """)
    sess.sql(""" alter table ge112 add partition field __created_date_  """)             

    // val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json")
    val cdcDf = sess.read.schema(snapshotDf.schema).json("s3://bucket/inputs2/ge11-partitioned/")
    cdcDf.createOrReplaceTempView("cdc")
    val _cdcDf =sess.sql(""" select * 
                             from cdc c1 
                             where cdc_pk in ( 
                                                select max(cdc_pk) 
                                                from cdc c2 
                                                where _id.oid is not null 
                                                  and _id.oid !='' 
                                                  and c2.__created_date_=c1.__created_date_  
                                                group by _id.oid)  """)
    _cdcDf.registerTempTable("_cdc")
    sess.sql(""" MERGE INTO ge112 t 
                using ( 
                        select * 
                        from _cdc )u 
                on t._id.oid = u._id.oid 
                  when matched then update set * 
                  when not matched then insert * """)

  }
nastra commented 1 year ago

can you please share the schema of the Parquet file and of the Iceberg table? It seems that there's a schema mismatch

chandu-1101 commented 1 year ago

Hi,

The input to the iceberg is json (when the exception happened). Below is the schema from .metadata folder of iceberg

The schema that was applied onto the json file when creating dataframe is also attached.

parquet-schema.txt

iceberg-schema.txt

nastra commented 1 year ago

@RussellSpitzer any idea what would cause this?

chandu-1101 commented 1 year ago

Any update? I get the exception still..

scala>     val _cdcDf =sess.sql(""" select *
     |                              from cdc c1
     |                              where cdc_pk in (
     |                                                 select max(cdc_pk)
     |                                                 from cdc c2
     |                                                 where _id.oid is not null
     |                                                   and _id.oid !=''
     |                                                   and c2.__created_date_=c1.__created_date_
     |                                                 group by _id.oid)  """)
_cdcDf: org.apache.spark.sql.DataFrame = [cdc_pk: string, cdc_oid: string ... 174 more fields]

scala>     _cdcDf.registerTempTable("_cdc")

scala>  sess.sql(""" MERGE INTO x11  t
     |                 using (
     |                         select *
     |                         from _cdc )u
     |                 on t._id.oid = u._id.oid
     |                   when matched then update set *
     |                   when not matched then insert * """)
23/08/17 07:00:37 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
23/08/17 07:12:45 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 1764) (ip-172-25-26-76.prod.x.local executor 8): java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:95)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_32_62$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.addMetadataColumnsIfNeeded(FileScanRDD.scala:291)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:318)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:184)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:138)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
RussellSpitzer commented 1 year ago

I would have guessed the same thing , schema mismatch. I would double check the schema produced by the two data frames being merged. Don't look at the underlying files. So in this case the spark schema produced by the source df and the table schema. Confirming those match would be the next step I think.

chandu-1101 commented 1 year ago

I am trying to understand the schema issue.

We first generate parquet files from Mongo. we prepare the dataframe on these parquet files, get schema, and give it to the dataframe created over CDC -.json files. So essentially it's the same schema with CDC, parquet. The same schema used to generate parquet , which was inturn used to write to iceberg was used to write cdc to iceberg

I reran the test, got the schema from the Iceberg and Parquet files. I couldn't find a reference to long in the schema files-- exception was Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long.

The merge ran for some 30min ~377 tasks/ ~900 tasks ran successfully. Given that it ran for a while, can it still be a schema issue? I am trying to understand.

iceberg-schema.txt

snapshot-schema.txt --Generated from the dataframe over the set of input partitioned parquet files. cdc-schema.txt

I suspect a bad row in the JSON. and tried to remove it as follows. But this also didn't work.

    val cdcDf = sess.read .schema(snapshotDf.schema) .option("mode","PERMISSIVE") .option("dateformat","yyyy-MM-dd") .option("badRecordsPath","s3://bucket/inputs2/bad-records/").json("s3://bucket/inputs2/x11-partitioned/")
    cdcDf.createOrReplaceTempView("cdc")

    sess.sql(""" MERGE INTO x11 t 
                using ( 
                        select * 
                        from cdc )u 
                on t._id.oid = u._id.oid 
                  when matched then update set * 
                  when not matched then insert * """)
nastra commented 1 year ago

As @RussellSpitzer mentioned above, I would try and look at the schema produced by the data frames

chandu-1101 commented 1 year ago

I checked the schemas and there is no datatype called long in it. The schemas match perfectly. The same were also shared previously.

To narrow down the issue i tried the below and see that the issue comes in when there is update in the sql statement. Even if I try to update only 1 string column i still get the ClassCastException

If i change the merge query to the below then all is good. I tried this by running with out the limit. Stil it worked good. The schemas of the select .. limit 100 match with the schema of x11.

sess.sql(""" MERGE INTO x11  t 
                 using ( select * from cdc limit 100)  u 
                 on t.cdc_oid = u.cdc_oid 

                   when not matched then insert * """)

But if i change the same to below

sess.sql(""" MERGE INTO x11 t 
                 using ( select * from cdc limit 100)  u 
                 on t.cdc_oid = u.cdc_oid
                   when matched then update set t.cdc_pk = '' 
                   when not matched then insert * """)

Then the merge fails with an exception (Caused by: java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long)

When i compare the schemas of cdc_oid for both x11 and cdc they are both string-s.

chandu-1101 commented 1 year ago

any thoughts? @nastra @RussellSpitzer the schema looks good. They match. None of them have datatype long in them, yet the issue.

gabrywu commented 1 year ago

any updates here? same issue at 1.2.1 version

gabrywu commented 1 year ago

@chandu-1101 have you resolved it?

chandu-1101 commented 1 year ago

No Not resolved.

The schema is perfectly good. As suggested to me by the previous comments by @RussellSpitzer and @nastra , I had compared the schema between CDC and Iceberg tables. They match perfectly (This i compared by printing the schema of Iceberg, and comparing it with the schema of dataframe over cdc-jsons. In fact the cdc-jsons were given the same schema file as that of Iceberg (the code is already pasted in my previous posts).

Added, I am also able to merge the json-CDC into the parquet snapshot using vanilla spark SQL without issues (i.e... without iceberg). Meaning I first dump the Mongo db into s3 as parqet files. Get the schema of these Mongo rows by scanning all the rows of mongodb (how? the config is pasted in my previous posts) and give the same schema to the json-cdc. Then Merge these 2 dataframes using vanilla spark sql --this works perfectly!

With the above, I strongly think it's not a schema issue. But my fellow mates on this forum still say it's a schema issue. So I am currently in catch-22 situation.

The only option I had was to add column by column and check where things break. But I have 719 columns and many of them are of complex datatypes (maps/ arrays/ maps with arrays with maps) --which means testing column by column is not practical.

MatatatDespe commented 1 year ago

FYI: if you use an insert/Overwrite it will work

chandu-1101 commented 1 year ago

what is the rationale?

juanrondineau commented 1 year ago

Hi, we are running spark jobs from DBT fwk, and we have the same issue when we run a model, with merge strategie, that has a table with struct data_type columns in its schema as source. Internally, for models with merge strategie, dbt creates a temporary view with the data that has to merge in the destination table. We recently discover that if insteed of a temprary view, we materialize the new data as a table and then we run the merge, we save the CastException and the merge directive run OK. That seems to be a workaround and not the final solution to this issue. We are actually running spark 3.3.2, iceberg 1.3.0 and hive 2.3.9 as metastore. Hope this help to solve the case.

ext-diego-duenas commented 1 year ago

Just to add to the Thread, we made a test changing all data types to String in the table with structs data type, and the error persist, so we don't think the error is related with the schema. Is applying a merge into in an iceberg table, with a source table with struct data types, regardless the underlying schema.

chandu-1101 commented 1 year ago

@juanrondineau , @ext-diego-duenas Glad to see updates on this thread.

@nastra even I don't think this is a schema issue!

@juanrondineau Can you explain further on below

We materialize the new data as a table and then we run the merge

In my case, I had a new iceberg table with the existing data from the snapshot added to it. meaning, the iceberg table has a single version --more like a materialized view (my guess of the materialized view is (a.)Format is iceberg (b.)Only 1 version of data is present). Please correct me if I am wrong.

juanrondineau commented 1 year ago

@chandu-1101 , thanks for your welcome i share 2 printscreens the first simulate on a dbeaver session connected to spark the operations that dbt internaly executes, in this case dbt creates a temporary view from a select over the table where we look for new data. Then when it tries to merge new data to destiny we got the cast exception. image

in the second printscreen whe change the create temporary view for a create table sentence and then we save the exception and the merge operation works fine image

harshith-bolar-rapido commented 11 months ago

Any update on this? Facing the same issue with 1.2.1 and spark 3.3.0

chandu-1101 commented 11 months ago

Abandon Iceberg, Abandon Hudi. Both are useless frameworks. Go with Parquet and spark sql. Your life will be simple and happy. All the point-in-time recovery, and merge on write are easy to achieve.

harshith-bolar-rapido commented 11 months ago

Noticed that this happens when the input folders are partitioned by date. Ex: yyyymmdd=20230304

Here's a minimal reproducible example. @RussellSpitzer @nastra

public class MRE {

    private static SparkConf getSparkConf() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.master", "local[*]");
        sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");

        // Catalog configs
        sparkConf.set("hive.metastore.uris", "thrift://metastore-url:9083");
        sparkConf.set("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog");
        sparkConf.set("spark.sql.catalog.hive.type", "hive");
        sparkConf.set("spark.sql.catalog.hive.uri", "thrift://metastore-url:9083");
        sparkConf.set("spark.sql.catalogImplementation", "hive");
        sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog");
        sparkConf.set("spark.sql.catalog.spark_catalog.type", "hive");

        return sparkConf;
    }

    private static SparkSession getSparkSession(SparkConf conf) {
        return SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
    }

    public static void main(String[] args) {
        SparkSession spark = getSparkSession(getSparkConf());

        Dataset<Row> df = spark
                .read()
                .option("basePath", "/tmp/")
                .json("/tmp/data/yyyymmdd=20220202");

        String namespace = "temp";
        String tableName = "iceberg_test_table";

        System.out.println("Creating table and writing..");
        df.writeTo("hive." + namespace + "." + tableName)
                .tableProperty("location", "/tmp/iceberg/" + tableName)
                .tableProperty("write.format.default", "orc")
                .createOrReplace();

        System.out.println("Writing again using MERGE INTO");
        df.createOrReplaceTempView("tmp_view");
        String mergeIntoQuery = "MERGE INTO hive." + namespace + "." + tableName + " AS table\n" +
                        "USING (SELECT * FROM tmp_view) AS updates\n" +
                        "ON table.keyA = updates.keyA\n" +
                        "WHEN MATCHED THEN UPDATE SET *\n" +
                        "WHEN NOT MATCHED THEN INSERT *";
        spark.sql(mergeIntoQuery);

        Dataset<Row> out = spark.sql("SELECT * FROM hive." + namespace + "." + tableName);
        out.show();
    }
}

The data file is just this

{"keyA": "valueA", "keyB": "valueB"}

put inside /tmp/data/yyyymmdd=20220202 folder.

Interestingly, the error disappears when we remove the basePath option from spark.read.

sabyasachinandy commented 11 months ago

Any update on this? Given this is a bread and butter usecase and still no one cares for looking into this in priority.

nastra commented 11 months ago

Sorry that it feels that nobody is actively looking into this. People might be busy with other things or might not be aware about this issue. To raise awareness and get some additional :eyes: on this issue you might want to mention it on the Dev mailing list at dev@iceberg.apache.org.

sabyasachinandy commented 11 months ago

I will do @nastra. I apologize for the rude tone. Surely I will mention it in the mailing list or maybe use slack

nastra commented 11 months ago

The mailing list might be the better option in this case. I believe pretty much all committers watch the mailing list, but not everyone is active on Slack

harshith-bolar-rapido commented 11 months ago

Quick heads up, the merge into query works fine in version 1.0.0, starts breaking from 1.1.0.

sabyasachinandy commented 11 months ago

Adding to this, I did a quick check of the stackstrace, seems like something is happening after the addMetadataRows function is getting called. It creates a generated class, which looks like it's assigning the datatypes of the source data to long

On Thu, 14 Dec, 2023, 20:42 Harshith Bolar, @.***> wrote:

Quick heads up, the merge into query works fine in version 1.0.0, starts breaking from 1.1.0.

— Reply to this email directly, view it on GitHub https://github.com/apache/iceberg/issues/8333#issuecomment-1856030578, or unsubscribe https://github.com/notifications/unsubscribe-auth/AQIPIQDO6GQQ4DGNYBZ5273YJMJNPAVCNFSM6AAAAAA3SKCVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQNJWGAZTANJXHA . You are receiving this because you commented.Message ID: @.***>

--

THIS EMAIL COMMUNICATION IS PRIVILEGED AND MAY CONTAIN CONFIDENTIAL INFORMATION OF RAPIDO. IF YOU ARE NOT THE INTENDED RECIPIENT, YOU ARE HEREBY NOTIFIED THAT YOU HAVE RECEIVED THIS MESSAGE IN ERROR AND ANY REVIEW, DISSEMINATION, DISTRIBUTION OR COPYING OF THIS MESSAGE IS STRICTLY PROHIBITED. PLEASE NOTIFY US IMMEDIATELY BY EMAIL AND DELETE THE MESSAGE FROM YOUR SYSTEM.****

NOTHING CONTAINED IN THIS DISCLAIMER SHALL BE CONSTRUED IN ANY WAY TO GRANT PERMISSION TO TRANSMIT CONFIDENTIAL INFORMATION OR AS A WAIVER OF ANY CONFIDENTIALITY OR PRIVILEGE.****

RAPIDO DOES NOT ACCEPT ANY RESPONSIBILITY OR LIABILITY ARISING FROM THE USE OF THIS COMMUNICATION. NO REPRESENTATION IS BEING MADE THAT THE INFORMATION PRESENTED IS ACCURATE, CURRENT OR COMPLETE AND SUCH INFORMATION IS AT ALL TIMES SUBJECT TO CHANGE WITHOUT NOTICE

amogh-jahagirdar commented 11 months ago

I've been discussing with @sabyasachinandy @harshith-bolar-rapido offline, I can reproduce the issue from the minimal reproduction steps here https://github.com/apache/iceberg/issues/8333#issuecomment-1854204500 only specifically on Spark 3.3.0 and cannot reproduce the issue in Spark 3.3.4 or later.

@harshith-bolar-rapido confirmed that he cannot repro on 3.3.4 either.

So something changed in the patch releases between 3.3.0 and 3.3.4.

I'd encourage folks on 3.3.0 to upgrade to 3.3.4 or higher if possible. I'll leave this open for now for a bit in case other folks hit this at higher spark versions.

SandeepSinghGahir commented 3 months ago

I understand that it's recommended to upgrade to spark 3.3.4, however, I'm using Glue 4.0 that comes with spark 3.3.0. Due to some other constraints, I cannot downgrade to Glue 3.0. Is there any other solution, config that we can use to avoid this error. Materializing the table and re-reading data from this table, only to apply MERGE takes away the potential incremental processing savings.

nastra commented 3 months ago

I understand that it's recommended to upgrade to spark 3.3.4, however, I'm using Glue 4.0 that comes with spark 3.3.0. Due to some other constraints, I cannot downgrade to Glue 3.0. Is there any other solution, config that we can use to avoid this error. Materializing the table and re-reading data from this table, only to apply MERGE takes away the potential incremental processing savings.

@SandeepSinghGahir this appears to be an issue in Spark itself that was fixed with 3.3.4 (or maybe in a version in between). You might want to check with the Spark community whether there's a workaround for this issue.

SandeepSinghGahir commented 3 months ago

As a workaround, I'm now saving dataframe into an iceberg table and not seeing this issue.