apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.37k stars 2.2k forks source link

Query fails when executed without filter i.e. aggregate pushdown #8859

Closed atifiu closed 1 week ago

atifiu commented 1 year ago

Apache Iceberg version

1.3.0

Query engine

Spark

Please describe the bug 🐞

Below query on Iceberg table fails:

spark.sql(f""" select max(visitor_id) from schema.table1 """).show()

But if we put a filter whether selecting all the partitions or single partition, query executes successfully.

@huaxingao @RussellSpitzer In case you can give some feedback on the same.

Below is the error log for the same:

y4j.protocol.Py4JJavaError: An error occurred while calling o103.showString.
: java.lang.RuntimeException: Failed while running parallel task
    at org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:118)
    at org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:162)
    at org.apache.iceberg.relocated.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:482)
    at org.apache.iceberg.relocated.com.google.common.collect.ImmutableList$Builder.addAll(ImmutableList.java:882)
    at org.apache.iceberg.relocated.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:282)
    at org.apache.iceberg.relocated.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:239)
    at org.apache.iceberg.spark.source.SparkScanBuilder.pushAggregation(SparkScanBuilder.java:235)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownAggregates$1.$anonfun$applyOrElse$10(V2ScanRelationPushDown.scala:165)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownAggregates$1.$anonfun$applyOrElse$10$adapted(V2ScanRelationPushDown.scala:165)
    at scala.Option.filter(Option.scala:289)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownAggregates$1.applyOrElse(V2ScanRelationPushDown.scala:165)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownAggregates$1.applyOrElse(V2ScanRelationPushDown.scala:93)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:528)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.pushDownAggregates(V2ScanRelationPushDown.scala:93)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$4(V2ScanRelationPushDown.scala:45)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$7(V2ScanRelationPushDown.scala:50)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:49)
    at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:37)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.HashMap.resize(HashMap.java:704)
    at java.util.HashMap.putVal(HashMap.java:629)
    at java.util.HashMap.putMapEntries(HashMap.java:515)
    at java.util.HashMap.putAll(HashMap.java:785)
    at org.apache.iceberg.util.SerializableMap.<init>(SerializableMap.java:39)
    at org.apache.iceberg.util.SerializableMap.copyOf(SerializableMap.java:43)
    at org.apache.iceberg.BaseFile.<init>(BaseFile.java:185)
    at org.apache.iceberg.GenericDataFile.<init>(GenericDataFile.java:71)
    at org.apache.iceberg.GenericDataFile.copy(GenericDataFile.java:84)
    at org.apache.iceberg.GenericDataFile.copy(GenericDataFile.java:28)
    at org.apache.iceberg.ContentFile.copy(ContentFile.java:177)
    at org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:350)
    at org.apache.iceberg.ManifestGroup$$Lambda$2303/1115830363.apply(Unknown Source)
    at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
    at org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
    at org.apache.iceberg.util.ParallelIterable$ParallelIterator$$Lambda$2294/745105277.run(Unknown Source)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
huaxingao commented 1 year ago

@atifiu Do you have a program that can reproduce the issue?

atifiu commented 1 year ago

@huaxingao No. Because this is happening only with large table 4TB+. For smaller tables it's working fine. Even on large table if I place filter it works.

paulpaul1076 commented 1 year ago

There should be a wrapped exception and explains what exactly happened in the parallel task.

atifiu commented 1 year ago

@paulpaul1076 I have now pasted the complete error stack.

huaxingao commented 1 year ago

Seems it caused by OOM

atifiu commented 1 year ago

@huaxingao I am unable to understand why OOM when not filtering the data but when filtering the data with selecting all the partitions without skipping anything works fine. Ideally in both cases we are referring to same amount of data.

paulpaul1076 commented 1 year ago

@atifiu have you tried giving your application more memory? How much memory are you specifying in spark submit?

github-actions[bot] commented 4 weeks ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 1 week ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'