apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.48k stars 2.24k forks source link

ERROR when executing UPDATE/DELETE queries in Iceberg 1.6.0: "Cannot add fieldId 1 as an identifier field" #11341

Open a8356555 opened 1 month ago

a8356555 commented 1 month ago

Apache Iceberg version

1.6.0

Query engine

Spark

Please describe the bug 🐞

Description: I'm encountering an issue when running UPDATE or DELETE queries after upgrading to Iceberg 1.6.0. These queries were working fine before Iceberg 1.6.0, but now I get the following error in PySpark:

pyspark.errors.exceptions.captured.IllegalArgumentException: Cannot add fieldId 1 as an identifier field: field does not exist

This error occurs whenever I try to execute any update or delete query in my Iceberg table. The table was originally created using FlinkSQL. Here is the script I used to create the table (use flink for the purpose of real-time upserting):

CREATE CATALOG glue_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'warehouse'='s3://my-bucket'
);

CREATE DATABASE IF NOT EXISTS glue_catalog.my_db;
CREATE TABLE IF NOT EXISTS glue_catalog.my_db.my_table(
    id_b  BIGINT,
    ...,
    id_a  STRING,
    id_c DATE,
    PRIMARY KEY (id_b,id_a,id_c) NOT ENFORCED
) PARTITIONED BY (
    id_c
) WITH (
    'format-version'='2',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.upsert.enabled'='true',
    'write.delete.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read',
    'write.update.mode'='merge-on-read'
);

Query Execution: The issue occurs when running an UPDATE or DELETE query in PySpark after the upgrade to Iceberg 1.6.0. Expected Behavior: The queries should execute successfully as they did before Iceberg 1.6.0.

Environment Details: previous iceberg-spark-runtime package VERSION: iceberg-spark-runtime-3.4_2.12:1.5.2 (Works fine) current iceberg-spark-runtime package VERSION: iceberg-spark-runtime-3.4_2.12:1.6.0 (Issue) iceberg-flink-runtime package VERSION: iceberg-flink-runtime-1.16-1.4.1.jar

Willingness to contribute

nastra commented 4 weeks ago

@a8356555 can you please add some details on the UPDATE/DELETE query you're running? Additionally, having the full stack trace would be helpful

meatheadmike commented 1 week ago

This bug does not appear to be limited to AWS nor Flink. I'm getting the same error with the following:

spark.sql(f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {iceberg_catalog}.{iceberg_db}.{iceberg_table}
    (
        offset BIGINT NOT NULL,
        kafka_timestamp TIMESTAMP NOT NULL,
        partition_key INT NOT NULL,
        domain STRING NOT NULL,
        risk STRING,
        timestamp TIMESTAMP NOT NULL
    )
    USING iceberg
    LOCATION '{iceberg_table_path}'
    TBLPROPERTIES (
            'format-version'='2',
            'write.delete.mode'='merge-on-read',
            'write.merge.mode'='merge-on-read',
            'write.metadata.delete-after-commit.enabled'='true',
            'write.update.mode'='merge-on-read',
            'write.upsert.enabled'='true'
    )
""").collect()

spark.sql(f"ALTER TABLE {iceberg_catalog}.{iceberg_db}.{iceberg_table} SET IDENTIFIER FIELDS domain").collect()

def processMicroBatch(batch_df, batch_id):
    batch_df.printSchema()
    batch_df.show()
    batch_df.createOrReplaceTempView("kafka_source")
    batch_df.sparkSession.sql(f"""
        MERGE INTO `{iceberg_catalog}`.`{iceberg_db}`.`{iceberg_table}` t
        USING (SELECT * FROM `kafka_source`) s
        ON t.domain = s.domain
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

df.writeStream \
   .format("iceberg") \
   .trigger(processingTime="1 minutes") \
   .options(**iceberg_options) \
   .outputMode("append") \
   .foreachBatch(processMicroBatch) \
   .start() 

spark.streams.awaitAnyTermination()

The returned error is: pyspark.errors.exceptions.captured.IllegalArgumentException: Cannot add fieldId 4 as an identifier field: field does not exist Note that the identifier field is the 4th field in the schema.

So it would appear that it is currently not possible to do streaming upserts with spark.