apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.28k stars 2.19k forks source link

It sometimes throws exception java.lang.AssertionError: assertion failed after upgrade to Iceberg 1.3.1 + Spark 3.4.1 #9092

Open jackwang2 opened 11 months ago

jackwang2 commented 11 months ago

Apache Iceberg version

1.3.1

Query engine

Spark

Please describe the bug 🐞

After upgrading to Iceberg/Spark 1.3.1/3.4.1 from 1.2.0/3.3.1, the job looks unstable and throws an exception of assertion failed sometimes when running a simple insert...select... even with retry 3 times.

INSERT INTO hive_prod.lena.idsp_tpats_reattribute (
       event_id,
       event_type,
       timestamp,
       is_test,
       is_demand_third_party,
       is_anonymous_vpn,
       is_suspicious_ip,
       ingest_time_at_s3,
       txn_time,
       ingest_time,
       timestamp__deprecated)
SELECT event_id,
       event_type,
       timestamp,
       is_test,
       is_demand_third_party,
       is_anonymous_vpn,
       is_suspicious_ip,
       ingest_time_at_s3,
       txn_time,
       ingest_time,
       NULL
FROM   _temp

Checked the Spark code on 3.4.1, and the assertion looks like

assert(Utils.isInRunningSparkTask || child.supportsColumnar)

Spark job configuration:

##########################################
## About Iceberg
##########################################
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://***.hive.***.io:9083

##########################################
## About S3
##########################################
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider

##########################################
## About Spark
##########################################
spark.driver.memory=10G
spark.driver.maxResultSize=4G
spark.driver.supervise=false
spark.driver.extraJavaOptions=-Xss20M

spark.executor.cores=10
spark.executor.extraJavaOptions=-Xss20M

spark.speculation=true
spark.rdd.compress=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=256M
spark.kryo.referenceTracking=false

spark.sql.shuffle.partitions=2000
spark.sql.autoBroadcastJoinThreshold=-1

spark.sql.adaptive.enabled=true

spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.parallelismFirst=false
spark.sql.adaptive.coalescePartitions.initialPartitionNum=40
spark.sql.adaptive.coalescePartitions.minPartitionSize=50MB
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB

spark.sql.adaptive.autoBroadcastJoinThreshold=600MB
spark.sql.adaptive.localShuffleReader.enabled=true

Error Messages:

23/11/15 22:31:54 WARN SparkMain: Appending installs which have no delivery to hive_prod.lena.idsp_installs_reattribute with 1 time(s)
23/11/15 22:31:54 WARN SparkMain: Append data to hive_prod.lena.idsp_installs_reattribute
23/11/15 22:31:59 WARN SparkMain: Appending installs which have no delivery to hive_prod.lena.idsp_installs_reattribute with 2 time(s)
23/11/15 22:31:59 WARN SparkMain: Append data to hive_prod.lena.idsp_installs_reattribute
23/11/15 22:32:01 WARN SparkMain: Appending installs which have no delivery to hive_prod.lena.idsp_installs_reattribute with 3 time(s)
23/11/15 22:32:01 WARN SparkMain: Append data to hive_prod.lena.idsp_installs_reattribute
Exception in thread "main" java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63)
  at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.reflect.InvocationTargetException
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$6(TreeNode.scala:813)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:812)
  at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:869)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:853)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:355)
  at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:851)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:853)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:355)
  at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:851)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:853)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:355)
  at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:851)
  at org.apache.spark.sql.execution.columnar.InMemoryRelation.innerChildren$lzycompute(InMemoryRelation.scala:390)
  at org.apache.spark.sql.execution.columnar.InMemoryRelation.innerChildren(InMemoryRelation.scala:382)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$2(ExplainUtils.scala:283)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$2$adapted(ExplainUtils.scala:280)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:285)
  at org.apache.spark.sql.execution.ExplainUtils$.removeTags(ExplainUtils.scala:280)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$1(ExplainUtils.scala:277)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$1$adapted(ExplainUtils.scala:277)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.execution.ExplainUtils$.remove$1(ExplainUtils.scala:277)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$2(ExplainUtils.scala:283)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$2$adapted(ExplainUtils.scala:280)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:285)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.execution.ExplainUtils$.removeTags(ExplainUtils.scala:280)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$1(ExplainUtils.scala:277)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$1$adapted(ExplainUtils.scala:277)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.execution.ExplainUtils$.remove$1(ExplainUtils.scala:277)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$2(ExplainUtils.scala:281)
  at org.apache.spark.sql.execution.ExplainUtils$.$anonfun$removeTags$2$adapted(ExplainUtils.scala:280)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:285)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:286)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:286)
  at org.apache.spark.sql.execution.ExplainUtils$.removeTags(ExplainUtils.scala:280)
  at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:107)
  at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
  at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
  at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
  at com.vungle.lena.BoilerplateSparkMain.appendToIcebergTable(Boilerplate.scala:1818)
  at com.vungle.lena.BoilerplateSparkMain.appendToIcebergTable$(Boilerplate.scala:1812)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.appendToIcebergTable(SparkMain.scala:10)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.$anonfun$process$1(SparkMain.scala:175)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at scala.util.Try$.apply(Try.scala:213)
  at com.vungle.lena.BoilerplateSparkMain.retry(Boilerplate.scala:407)
  at com.vungle.lena.BoilerplateSparkMain.retry$(Boilerplate.scala:406)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.retry(SparkMain.scala:10)
  at com.vungle.lena.BoilerplateSparkMain.withRetry(Boilerplate.scala:420)
  at com.vungle.lena.BoilerplateSparkMain.withRetry$(Boilerplate.scala:418)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.withRetry(SparkMain.scala:10)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.process(SparkMain.scala:165)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.$anonfun$run$3(SparkMain.scala:257)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.$anonfun$run$3$adapted(SparkMain.scala:255)
  at com.vungle.lena.BoilerplateSparkMain.withCoba2TempViewInRange(Boilerplate.scala:1354)
  at com.vungle.lena.BoilerplateSparkMain.withCoba2TempViewInRange$(Boilerplate.scala:1343)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.withCoba2TempViewInRange(SparkMain.scala:10)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.$anonfun$run$2(SparkMain.scala:255)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.$anonfun$run$2$adapted(SparkMain.scala:237)
  at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.run(SparkMain.scala:237)
  at com.vungle.lena.BoilerplateSparkMain.main(Boilerplate.scala:2625)
  at com.vungle.lena.BoilerplateSparkMain.main$(Boilerplate.scala:2605)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.main(SparkMain.scala:10)
  at com.vungle.lena.idsp.installs_attribution.SparkMain.main(SparkMain.scala)
  ... 6 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:208)
  at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:72)
  ... 173 more
nastra commented 11 months ago

According to

 at com.vungle.lena.BoilerplateSparkMain.appendToIcebergTable(Boilerplate.scala:1818)
  at com.vungle.lena.BoilerplateSparkMain.appendToIcebergTable$(Boilerplate.scala:1812)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.appendToIcebergTable(SparkMain.scala:10)
  at com.vungle.lena.idsp.installs_attribution.SparkMain$.$anonfun$process$1(SparkMain.scala:175)

it looks like you're using some custom code and without knowing what's inside there it's difficult to tell why that issue happens. Also I'm not sure the issue itself is related to Iceberg.

jackwang2 commented 11 months ago

INSERT INTO hive_prod.lena.idsp_tpats_reattribute ( event_id, event_type, timestamp, is_test, is_demand_third_party, is_anonymous_vpn, is_suspicious_ip, ingest_time_at_s3, txn_time, ingest_time, timestamp__deprecated) SELECT event_id, event_type, timestamp, is_test, is_demand_third_party, is_anonymous_vpn, is_suspicious_ip, ingest_time_at_s3, txn_time, ingest_time, NULL FROM _temp

Hi @nastra , thanks for the reply, the code looks like below, both _temp and hive_prod.lena.idsp_tpats_reattribute are Iceberg tables:

INSERT INTO hive_prod.lena.idsp_tpats_reattribute (
       event_id,
       event_type,
       timestamp,
       is_test,
       is_demand_third_party,
       is_anonymous_vpn,
       is_suspicious_ip,
       ingest_time_at_s3,
       txn_time,
       ingest_time,
       timestamp__deprecated)
SELECT event_id,
       event_type,
       timestamp,
       is_test,
       is_demand_third_party,
       is_anonymous_vpn,
       is_suspicious_ip,
       ingest_time_at_s3,
       txn_time,
       ingest_time,
       NULL
FROM   _temp
nastra commented 11 months ago

@jackwang2 I didn't mean the SQL but the actual code that's inside those custom modules. You must be doing something that fails that particular assertion you mentioned above.

jackwang2 commented 11 months ago

Add @jiantao-vungle and @daoxunwu-vungle.

jiantao-vungle commented 11 months ago

@nastra no very special operations inside this module, except that we have a deprecated field named timestamp__deprecated and we will insert value NULL for it.

nastra commented 10 months ago

@jiantao-vungle I would still check what the custom code is doing as it must be doing something that fails that particular assertion that was mentioned further above

github-actions[bot] commented 1 week ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.