apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
5.94k stars 2.08k forks source link

MERGE INTO TABLE is not supported temporarily #5424

Open jessiedanwang opened 1 year ago

jessiedanwang commented 1 year ago

Query engine

Spark 3.1.2 on EMR 6.5

Question

We are running Spark 3.1.2 application on EMR 6.5 in account BBB while writing data to S3 and Glue catalog in account AAA. I would appreciate it any ideas what could be wrong here

Here is what we did, spark = SparkSession .builder() .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // Iceberg related configs .config("spark.sql.autoBroadcastJoinThreshold", "-1") .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1") .config(s"spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") .config(s"spark.sql.catalog.iceberg_catalog.warehouse", warehousePath) .config(s"spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config(s"spark.sql.catalog.iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config(s"spark.sql.catalog.iceberg_catalog.client.factory", "org.apache.iceberg.aws.AssumeRoleAwsClientFactory") .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.arn", s"arn:aws:iam::AAA:role/RoleToAssume") .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.region", "us-east-2") .config("spark.hadoop.hive.metastore.glue.catalogid", AAA) .enableHiveSupport() .getOrCreate()

df.writeTo("ns_name.table_name") .tableProperty("table_type", "iceberg") .tableProperty("format-version", "2") //.tableProperty("engine.hive.enabled", "true") .tableProperty("write.distribution-mode", "hash") .tableProperty("write.spark.fanout.enabled", "true") .tableProperty("write.parquet.compression-codec", "snappy") .tableProperty("write.avro.compression-codec", "snappy") .tableProperty("write.metadata.delete-after-commit.enabled", "true") .tableProperty("write.metadata.previous-versions-max", "3") .tableProperty("write.target-file-size-bytes", s"$GIGABYTE") .create()

df2.createOrReplaceGlobalTempView("temp_view") val query: String = s"""MERGE INTO ns_name.table_name t |USING (SELECT FROM global_temp.temp_view) s |s.id = t.id |WHEN MATCHED THEN UPDATE SET |WHEN NOT MATCHED THEN INSERT * |""".stripMargin spark.sql(query)

Got the following error when trying to do MERGE INTO java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.

rakesh-tmdc commented 1 year ago

Hi I am also facing similar kind of issue with merge into functionality. I am using spark3.3.0 with delta lake - iceberg 0.14.0 and getting below error with complete subquery -

Caused by: org.apache.spark.sql.AnalysisException: unresolved operator 'ReplaceIcebergData RelationV2[__metadata#404, uuid#405, ts#406, user_id#407, segment_id#408, activity#409, activity_occurrence#410, repeated_at#411] default.segment_table; With merge into clause - MERGE INTO default.segment_table old USING (SELECT * FROM segment_final) new ON old.uuid = new.uuid AND old.repeated_at IS NULL WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *

However when i am trying single merge condition seperatly like WHEN MATCHED THEN UPDATE SET * or WHEN NOT MATCHED THEN INSERT * its working fine.

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

suisenkotoba commented 1 year ago

anyone has any luck with this? I faced the same issue, I could not find what caused this. I am not sure if there's something wrong with my spark configuration. I use spark 3.3 on dataproc (image version 2.1) with iceberg 1.1.0. The dataproc cluster already had dataproc metastore attached. I already added iceberg extension in my spark config, and even use table version 2, but I still get error MERGE INTO TABLE is not supported temporarily This is my python code:

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import LongType, BooleanType, StructField, StructType

conf = (
        SparkConf()
        .setAppName('test_iceberg')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
        .set('spark.sql.catalog.spark_catalog.type', 'hive')
        .set(f'spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
        .set(f'spark.sql.catalog.dev.type', 'hive')
    )
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

df = spark.read.option("mergeSchema", "true").parquet('gs://poc-kafka/mis').sort("timestamp")

# Create iceberg table 
schema = df.select("value.after.*").schema
additional_fields = [StructField('start_at', LongType(), True), 
               StructField('end_at', LongType(), True), 
               StructField('is_current', BooleanType(), True)]
schema = StructType(schema.fields + additional_fields)
empty_df = spark.createDataFrame([], schema)
spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy")
empty_df.createOrReplaceTempView("empty")
spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg  "
         "TBLPROPERTIES ('format-version'='2') "
         "AS (SELECT * FROM empty) ")

# Merge table
df.createOrReplaceTempView("changelog")
sql = "MERGE INTO dev.dummy.fruit d " \
      "USING (SELECT value.after.*, value.op op, " \
      "            value.source.ts_ms start_at, True is_current, " \
      "            NULL end_at " \
      "        FROM changelog) s " \
      "ON d.id = s.id " \
      "WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = s.start_at " \
      "WHEN NOT MATCHED THEN INSERT * "
spark.sql(sql)

This is the traceback I got:

/usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
   1032             sqlQuery = formatter.format(sqlQuery, **kwargs)
   1033         try:
-> 1034             return DataFrame(self._jsparkSession.sql(sqlQuery), self)
   1035         finally:
   1036             if len(kwargs) > 0:

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    188     def deco(*a: Any, **kw: Any) -> Any:
    189         try:
--> 190             return f(*a, **kw)
    191         except Py4JJavaError as e:
    192             converted = convert_exception(e.java_exception)

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o67.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
    at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:821)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

If it helps, this is the dataproc cluster properties I included when creating the cluster: --properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'

owencwl commented 1 year ago

I am also facing similar kind of issue with merge into functionality. spark 3.2 + iceberg 0.13.1:

spark-sql: merge into spark_catalog.default.merge_test as t using (select 1 as id,'123456' as data,'flink1.14.5' as category,cast('2016-08-31' as date) as ts) as s on t.id=s.id when matched then delete when not matched then insert *;

spark-error: Error in query: Project [id#50L, data#51, category#52, ts#53] +- RowFilterAndDataMaskingMarker +- RelationV2[id#50L, data#51, category#52, ts#53] spark_catalog.default.merge_test is not an Iceberg table

Is it the reason for the temporary table (named s)??

scala-source-code: org.apache.spark.sql.catalyst.analysis.RewriteMergeIntoTable

 EliminateSubqueryAliases(aliasedTable) match {
    case r: DataSourceV2Relation => xxxxxxxx
   case p =>
      throw new AnalysisException(s"$p is not an Iceberg table")

Why does such a problem arise????

alhaiz313 commented 1 month ago

In my case it was because I forgot to add this configuration to my spark job:

--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

rakesh-tmdc commented 1 month ago

I am also facing similar kind of issue with merge into functionality. spark 3.2 + iceberg 0.13.1:

spark-sql: merge into spark_catalog.default.merge_test as t using (select 1 as id,'123456' as data,'flink1.14.5' as category,cast('2016-08-31' as date) as ts) as s on t.id=s.id when matched then delete when not matched then insert *;

spark-error: Error in query: Project [id#50L, data#51, category#52, ts#53] +- RowFilterAndDataMaskingMarker +- RelationV2[id#50L, data#51, category#52, ts#53] spark_catalog.default.merge_test is not an Iceberg table

Is it the reason for the temporary table (named s)??

scala-source-code: org.apache.spark.sql.catalyst.analysis.RewriteMergeIntoTable

 EliminateSubqueryAliases(aliasedTable) match {
    case r: DataSourceV2Relation => xxxxxxxx
   case p =>
      throw new AnalysisException(s"$p is not an Iceberg table")

Why does such a problem arise????

https://stackoverflow.com/questions/73424454/spark-iceberg-merge-into-issue-caused-by-org-apache-spark-sql-analysisexcep

I ran into same issue, try this.