Open huanghanyu-vungle opened 1 year ago
Details on the query you're running would be helpful if possible. Also is the issue you are seeing specific to 1.4.2 or are older versions working for you?
@huanghanyu-vungle can you also please share your entire catalog configuration?
According to
com.vungle.lena.idsp.aggregation_report.SparkMain$.saveReport(SparkMain.scala:233) at
com.vungle.lena.idsp.aggregation_report.SparkMain$.doForEachPeriod(SparkMain.scala:156) at
com.vungle.lena.idsp.aggregation_report.SparkMain$.$anonfun$run$1(SparkMain.scala:67) at
com.vungle.lena.idsp.aggregation_report.SparkMain$.$anonfun$run$1$adapted(SparkMain.scala:63) at
it looks like you're running some custom code on top of Iceberg, so without knowing any more details it's difficult to help here.
@nastra sorry for the late reply this is my catalog configuration
##########################################
## About Iceberg
##########################################
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://***.hive.***.io:9083
##########################################
## About S3
##########################################
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider
##########################################
## About Spark
##########################################
spark.driver.memory=4g
spark.driver.maxResultSize=1g
spark.driver.supervise=false
spark.executor.extraJavaOptions=-XX:+UseG1GC
spark.memory.fraction=0.7
spark.memory.storageFraction=0.2
spark.speculation=true
spark.rdd.compress=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=256m
spark.sql.shuffle.partitions=3000
spark.sql.broadcastTimeout=300
spark.sql.autoBroadcastJoinThreshold=314572800
spark.sql.adaptive.autoBroadcastJoinThreshold=314572800
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.parallelismFirst=false
spark.sql.adaptive.coalescePartitions.initialPartitionNum=10
spark.sql.adaptive.advisoryPartitionSizeInBytes=134217728
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=3
sorry for the late reply @nastra @amogh-jahagirdar This is my saveReport function, it should be a very simple sql statement:
def saveReport(data: DataFrame, targetTable: String, isPublisherReport: Boolean): Unit = {
val tmpTableName = "__tmp"
data.createOrReplaceTempView(tmpTableName)
val (min_event_time, max_event_time, min_ingest_time, max_ingest_time) = {
val timeStatRow = spark.sql(s"""
SELECT
date_format(MIN(event_time), 'yyyy-MM-dd HH') min_event_time,
date_format(MAX(event_time), 'yyyy-MM-dd HH') max_event_time,
date_format(MIN(ingest_time), 'yyyy-MM-dd HH') min_ingest_time,
date_format(MAX(ingest_time), 'yyyy-MM-dd HH') max_ingest_time
FROM $tmpTableName
""").collect()(0)
(timeStatRow.getString(0), timeStatRow.getString(1),
timeStatRow.getString(2), timeStatRow.getString(3))
}
if(isPublisherReport) {
spark.sql(s"""
DELETE FROM $targetTable
WHERE event_time >= '$min_event_time' AND event_time <= '$max_event_time' AND
ingest_time >= '$min_ingest_time' AND ingest_time <= '$max_ingest_time' AND
dsp_type = 'external' AND is_skan is false
""")
} else {
spark.sql(s"""
DELETE FROM $targetTable
WHERE event_time >= '$min_event_time' AND event_time <= '$max_event_time' AND
ingest_time >= '$min_ingest_time' AND ingest_time <= '$max_ingest_time'
""")
}
appendToIcebergTable(targetTable, data)
def appendToIcebergTable(targetTable: String, df: DataFrame): Unit = {
_logger.warn(s"Append data to $targetTable")
val (targetCols, sourceCols) = matchDFSchemaWithTargetTable(targetTable, df)
df.createOrReplaceTempView("_temp")
spark.sql(s"""
INSERT INTO $targetTable ($targetCols) SELECT $sourceCols FROM _temp
""")
_logger.warn(s"Done append data to $targetTable")
getIcebergLastAppendCountVerbose(targetTable)
}
I am also seeing this issue. I have existing Iceberg tables, for which a large number of Spark SQL queries simply fail once I use more updated libraries.
My existing tables were created and updated using Spark on EMR over the past year. I can recreate only on modern EMR/Iceberg environments for the same queries that run on previous ones. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/Iceberg-release-history.html
emr-6.10.0 - Reads and Writes work emr-6.10.1 - Reads and Writes work ... emr-6.14.0 - Reads and Writes work emr-6.15.0 - Some reads don't work emr-7.0.0 - Some reads don't work
The cutoff appears to be Iceberg version 1.4.0
+.
I'm not sure if this helps, working with an obfuscated example. The situation I'm seeing where a query that fails includes many CTEs, and the error only appears with a particular one.
spark.sql("""
WITH
a as (SELECT * FROM table WHERE <predicate>),
b as (SELECT * FROM table2 WHERE <predicate>)
... joins, filters, etc
z as (SELECT * FROM a union b union c join d...)
SELECT * FROM z"""
)
An intermediate CTE is where the error manifests. I cannot tell anything about it that is immediately suspicious
m AS (SELECT col1, col2, col3, col4, ... FROM l)
Such that SELECT * FROM l
succeeds, but SELECT * FROM m
fails.
@ZachDischner could you provide the detailed stack trace in your case? The error being thrown by Spark is just a generic error and the stack trace details matter in this case to figure out where the bug is coming from.
Exact trace is:
Exception in thread "main" org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:615)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:627)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:256)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:255)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:157)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:153)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:371)
at org.apache.spark.sql.execution.CacheManager.$anonfun$cacheQuery$2(CacheManager.scala:125)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:120)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:93)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:3829)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:3839)
<Application code is performing a df.cache.count for data read from a table>
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at org.apache.spark.sql.execution.datasources.v2.PushablePredicate$.$anonfun$unapply$1(DataSourceV2Strategy.scala:667)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.execution.datasources.v2.PushablePredicate$.unapply(DataSourceV2Strategy.scala:666)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.translateLeafNodeFilterV2(DataSourceV2Strategy.scala:560)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.translateFilterV2WithMapping(DataSourceV2Strategy.scala:613)
at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.$anonfun$pushFilters$3(PushDownUtils.scala:88)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushFilters(PushDownUtils.scala:85)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:74)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:61)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1288)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1287)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1288)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1287)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:741)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1314)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1313)
at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:572)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1288)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1287)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1288)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1287)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1314)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1313)
at org.apache.spark.sql.catalyst.plans.logical.Join.mapChildren(basicLogicalOperators.scala:572)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1288)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1287)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1288)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1287)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:508)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:447)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.pushDownFilters(V2ScanRelationPushDown.scala:61)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$3(V2ScanRelationPushDown.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$8(V2ScanRelationPushDown.scala:52)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:51)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:239)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:236)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:319)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:368)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:319)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:309)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:309)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:195)
at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.super$execute(BaseOptimizer.scala:28)
at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.$anonfun$execute$1(BaseOptimizer.scala:28)
at org.apache.spark.sql.catalyst.optimizer.OptimizationContext$.withOptimizationContext(OptimizationContext.scala:80)
at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.execute(BaseOptimizer.scala:28)
at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.execute(BaseOptimizer.scala:23)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:191)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:161)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:219)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:256)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:625)
I'm trying to recreate the exact conditions of the failure
might be related to https://issues.apache.org/jira/browse/SPARK-46847. When switching the Iceberg version, did you also switch the Spark version? Because that Spark issue started to happen in Spark 3.4.x (https://github.com/apache/spark/commit/53df45650af1e48e01e392caed6c1f83c2e9e9f1#diff-b2edb35eb7c49f1f3c1fd074927328ac103e9a2cd058e6183e4cb7604ada6dd4)
That appears to be it. I didn't switch spark versions knowingly - I observed this when upgrading from EMR 6.14 (Spark 3.4.1, Iceberg 1.3.1-amzn-0) to EMR 6.15+ (Spark 3.4.1, Iceberg 1.4.0-amzn-0). Same with EMR 7 which runs on Spark 3.5.
That ticket looks to describe the same type of problem, in https://github.com/apache/spark/blob/2b0e841a35343343c82e8ca15225014b64d8c59f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala#L669, where
object PushablePredicate {
def unapply(e: Expression): Option[Predicate] =
new V2ExpressionBuilder(e, true).build().map { v =>
assert(v.isInstanceOf[Predicate])
v.asInstanceOf[Predicate]
}
the expression isn't actually an instance of Predicate? Seems like this is a Spark issue. I wonder if Iceberg's upgrade was the thing that accidentally exercised and uncovered the issue somehow.
I reported a similar issue: https://issues.apache.org/jira/browse/SPARK-47463
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
Apache Iceberg version
1.4.2 (latest release)
Query engine
Spark
Please describe the bug 🐞
iceberg 1.4.2 , spark 3.4.1
hadoop-aws 3.3.1
The specific error information is as follows:
Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:63) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. at org.apache.spark.SparkException$.internalError(SparkException.scala:88) at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:516) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:528) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:139) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165) at org.apache.spark.sql.Dataset.collect(Dataset.scala:3418) at com.vungle.lena.idsp.aggregation_report.SparkMain$.saveReport(SparkMain.scala:233) at com.vungle.lena.idsp.aggregation_report.SparkMain$.doForEachPeriod(SparkMain.scala:156) at com.vungle.lena.idsp.aggregation_report.SparkMain$.$anonfun$run$1(SparkMain.scala:67) at com.vungle.lena.idsp.aggregation_report.SparkMain$.$anonfun$run$1$adapted(SparkMain.scala:63) at scala.collection.immutable.List.foreach(List.scala:431) at com.vungle.lena.idsp.aggregation_report.SparkMain$.run(SparkMain.scala:63) at com.vungle.lena.BoilerplateSparkMain.main(Boilerplate.scala:2625) at com.vungle.lena.BoilerplateSparkMain.main$(Boilerplate.scala:2605) at com.vungle.lena.idsp.aggregation_report.SparkMain$.main(SparkMain.scala:7) at com.vungle.lena.idsp.aggregation_report.SparkMain.main(SparkMain.scala) ... 6 more Caused by: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.sql.execution.datasources.v2.PushablePredicate$.$anonfun$unapply$1(DataSourceV2Strategy.scala:650) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.execution.datasources.v2.PushablePredicate$.unapply(DataSourceV2Strategy.scala:649) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.translateLeafNodeFilterV2(DataSourceV2Strategy.scala:543) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.translateFilterV2WithMapping(DataSourceV2Strategy.scala:596) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.translateFilterV2WithMapping(DataSourceV2Strategy.scala:593) at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.$anonfun$pushFilters$3(PushDownUtils.scala:87) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushFilters(PushDownUtils.scala:84) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:73) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:60) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248) at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248) at org.apache.spark.sql.catalyst.plans.logical.Aggregate.mapChildren(basicLogicalOperators.scala:1122) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:456) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.pushDownFilters(V2ScanRelationPushDown.scala:60) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$3(V2ScanRelationPushDown.scala:44) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$8(V2ScanRelationPushDown.scala:51) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:50) at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:37) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:143) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) ... 34 more Caused by: org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase optimization failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
Tried to analyze the cause, but found no result