Closed bihaiyang closed 3 years ago
I was encounter the same error when I use MERGE INTO clause. And I found if the target table not match any data, the IllegalArgumentException will be throw.
For example, I have some log in source table, and I need to aggregate logs into events and write to destination table.
src(user_logs) table: | log_time | log_type | user_name |
---|---|---|---|
2021-05-06 12:01:00 | heartbeat | Bob | |
2021-05-06 12:03:00 | heartbeat | Bob | |
2021-05-06 12:05:00 | heartbeat | Bob | |
2021-05-06 12:05:00 | login | John | |
2021-05-06 12:05:30 | heartbeat | John | |
2021-05-06 12:06:00 | heartbeat | John | |
2021-05-06 12:06:30 | logout | John |
dst(user_events) table: | start_time | end_time | user_name |
---|---|---|---|
2021-05-06 12:01:00 | 2021-05-06 12:03:00 | Bob |
If I execute the following SQL, the error will happend.
MERGE INTO iceberg.iceberg_db.user_events AS dst
USING(
SELECT MIN(log_time) AS start_time,
MAX(log_time) AS end_time,
user_name
FROM iceberg.iceberg_db.user_logs
WHERE log_time >= '2021-05-06 12:05:00'
AND log_time < '2021-05-06 12:10:00'
GROUP BY user_name
) AS src
ON src.user_name = dst.user_name
AND src.start_time = dst.end_time
WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
WHEN NOT MATCHED THEN INSERT *;
And the stack as follow:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(1, 0)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:366)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(1, 0)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:366)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(ReplaceDataExec.scala:26)
at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(ReplaceDataExec.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
But if I change the log_time range to (2021-05-06 12:03:00, 2021-05-06 12:10:00), the SQL will be success.
I guest it's because dst table contain user_name = 'Bob'
AND end_time = '2021-05-06 12:03:00'
record, that is match the src.user_name = dst.user_name AND src.start_time = dst.end_time
condition and make dst table is not empty.
My env is spark == 3.0.2 and Iceberg == master. We can reproduce this by execute the sql as below:
CREATE TABLE iceberg.iceberg_db.user_logs (
log_time STRING,
log_type STRING,
user_name STRING
) USING iceberg
TBLPROPERTIES (engine.hive.enabled='true');
INSERT INTO iceberg.iceberg_db.user_logs
VALUES ('2021-05-06 12:01:00', 'heartbeat', 'Bob'),
('2021-05-06 12:03:00', 'heartbeat', 'Bob'),
('2021-05-06 12:05:00', 'heartbeat', 'Bob'),
('2021-05-06 12:05:00', 'login', 'John'),
('2021-05-06 12:05:30', 'heartbeat', 'John'),
('2021-05-06 12:06:00', 'heartbeat', 'John'),
('2021-05-06 12:06:30', 'logout', 'John');
CREATE TABLE iceberg.iceberg_db.user_events (
start_time STRING,
end_time STRING,
user_name STRING
) USING iceberg
TBLPROPERTIES (engine.hive.enabled='true');
INSERT INTO iceberg.iceberg_db.user_events
VALUES ('2021-05-06 12:01:00', '2021-05-06 12:03:00', 'Bob');
MERGE INTO iceberg.iceberg_db.user_events AS dst
USING(
SELECT MIN(log_time) AS start_time,
MAX(log_time) AS end_time,
user_name
FROM iceberg.iceberg_db.user_logs
WHERE log_time >= '2021-05-06 12:05:00'
AND log_time < '2021-05-06 12:10:00'
GROUP BY user_name
) AS src
ON src.user_name = dst.user_name
AND src.start_time = dst.end_time
WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO iceberg.iceberg_db.user_events AS dst
USING(
SELECT MIN(log_time) AS start_time,
MAX(log_time) AS end_time,
user_name
FROM iceberg.iceberg_db.user_logs
WHERE log_time >= '2021-05-06 12:03:00'
AND log_time < '2021-05-06 12:10:00'
GROUP BY user_name
) AS src
ON src.user_name = dst.user_name
AND src.start_time = dst.end_time
WHEN MATCHED THEN UPDATE SET dst.end_time = src.end_time
WHEN NOT MATCHED THEN INSERT *;
Experiment result:
cc @RussellSpitzer @flyrain @karuppayya @kbendick, has anyone seen anything similar? Can anybody investigate?
Looks like there is a special casing for empty RDD's
Which generates a single partition rdd from a 0 partition rdd.
My guess is that something in our code is invoking a ZippedRDD (probably a broadcast join?) which ends up having 0 data in it. This is a ZippedRDD with 0 partitions, but there is an assertion error that you can't have such an RDD when we call .partitions
I'm going to see if I can figure out where we generate this RDD, basically we need to not have our underlying query.execute produce a zipped rdd if it is empty.
OK So here is what happens,
We build up this plan which has a sort merge join
The sort merge join implements as a zippartitions rdd
result = {MapPartitionsRDD@21132} "MapPartitionsRDD[49] at sql at <console>:26"
isBarrier_ = false
prev = {ZippedPartitionsRDD2@21135} "ZippedPartitionsRDD2[48] at sql at <console>:26"
f = {SortMergeJoinExec$lambda@21156} "org.apache.spark.sql.execution.joins.SortMergeJoinExec$$Lambda$3615/2092599862@10710787"
arg = {SortMergeJoinExec@21170} "SortMergeJoin [user_name#39, start_time#32], [user_name#36, end_time#35], FullOuter\n:- *(2) Sort [user_name#39 ASC NULLS FIRST, start_time#32 ASC NULLS FIRST], false, 0\n: +- SortAggregate(key=[user_name#39], functions=[min(log_time#37), max(log_time#37)], output=[start_time#32, end_time#33, user_name#39, _row_from_source_#42])\n: +- SortAggregate(key=[user_name#39], functions=[partial_min(log_time#37), partial_max(log_time#37)], output=[user_name#39, min#60, max#61])\n: +- *(1) Sort [user_name#39 ASC NULLS FIRST], false, 0\n: +- *(1) Project [log_time#37, user_name#39]\n: +- *(1) Filter ((isnotnull(log_time#37) AND (log_time#37 >= 2021-05-06 12:05:00)) AND (log_time#37 < 2021-05-06 12:10:00))\n: +- BatchScan[log_time#37, user_name#39] iceberg.iceberg_db.user_logs [filters=log_time IS NOT NULL, log_time >= '2021-05-06 12:05:00', log_time < '2021-05-06 12:10:00']\n+- *(8) Sort [user_name#36 ASC NULLS FIRST, end_time#35 ASC NULLS FIRST], false,"
This is because of SortMergeJoinExec
does the zip partitions. This code is not built for handling what happens when you try to zip a 0 partition RDD with a 1 Partition RDD.
I think we basically just need to preempt our creation of a JoinPlan if either side of the join is an empty RDD and just bail out in the MergePlanning. Or we have to never create 0 partition rdd's from our sources ...
Spark normally just won't plan a join when one side is empty, quick demonstration
scala> val left = spark.createDataset(Seq.empty[Row])
left: org.apache.spark.sql.Dataset[Row] = [i: int]
scala> val right = spark.createDataset(Seq(Row(1)))
right: org.apache.spark.sql.Dataset[Row] = [i: int]
scala> left.join(right, left("i") === right("i"), "left_outer").show
+---+---+
| i| i|
+---+---+
+---+---+
scala> left.join(right, left("i") === right("i"), "left_outer").queryExecution.logical
res14: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Join LeftOuter, (i#133 = i#137)
:- LocalRelation <empty>, [i#133]
+- LocalRelation [i#137]
scala> left.join(right, left("i") === right("i"), "left_outer").queryExecution.spark
sparkPlan sparkSession
scala> left.join(right, left("i") === right("i"), "left_outer").queryExecution.sparkPlan
res15: org.apache.spark.sql.execution.SparkPlan =
LocalTableScan <empty>, [i#133, i#137]
I think this rule is fixing it for Spark
I think @RussellSpitzer got it correctly and will submit a fix.
@Reo-LEI This patch can not pass my test case,i think It's my configuration that's wrong。Can I get your contact information?email or QQ Or other?Thank you very much
--jars /opt/spark/jars/iceberg-spark3-runtime-30ec9b5.jar
--conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
I used the decompiler tool to make sure the code was correct,i use my test case or @Reo-LEI test case ,It didn't work。Can I talk to you for a minute ,Please
--jars /opt/spark/jars/iceberg-spark3-runtime-30ec9b5.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
I used the decompiler tool to make sure the code was correct,i use my test case or @Reo-LEI test case ,It didn't work。Can I talk to you for a minute ,Please
@Reo-LEI
--jars /opt/spark/jars/iceberg-spark3-runtime-30ec9b5.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
I used the decompiler tool to make sure the code was correct,i use my test case or @Reo-LEI test case ,It didn't work。Can I talk to you for a minute ,Please
sure, you can contact me via leinuowen@gmail.com
--jars /opt/spark/jars/iceberg-spark3-runtime-30ec9b5.jar --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog I used the decompiler tool to make sure the code was correct,i use my test case or @Reo-LEI test case ,It didn't work。Can I talk to you for a minute ,Please
sure, you can contact me via leinuowen@gmail.com
Thank you for your help
@RussellSpitzer
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
Thank you for your help,I want to confirm whether this parameter is required,Because my current Spark architecture is Spark-ThriftServer & Aluxio & OSS,There is no hive. Metastore. Uri; Does your piece of logic have to be configured with this class(org.apache.iceberg.spark.SparkCatalog) to execute? I'm just configuration
-- conf spark. SQL. Extensions = org. Apache. Iceberg. Spark. Extensions. IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
I think I've found the problem,Thank you very much for your follow-up!! @RussellSpitzer @aokolnychyi @Reo-LEI
I think I've found the problem,Thank you very much for your follow-up!! @RussellSpitzer @aokolnychyi @Reo-LEI
@221770490011111 hello, I got the same exception. Where is the key to solving this?
@KarlManong You can build Iceberg from master, or wait for the 0.12 release. The fix is in tree but not yet in a released version
@KarlManong You can build Iceberg from master, or wait for the 0.12 release. The fix is in tree but not yet in a released version
@RussellSpitzer I was running on https://github.com/apache/iceberg/commit/b3fb81a19e9632608b2bd4cdede4bbfcbd26bc8e, but still got this exception. Am I missing something?
How were you running on that build? My first guess would be that that version wasn't actually present at runtime
How were you running on that build? My first guess would be that that version wasn't actually present at runtime
@RussellSpitzer I rebuilt a table with exactly the same statement(using Trino), and everything worked well. The only difference is that the old table has some data.
The failed logs: s-bigdata-402-5918f407-84557c7aa8cf66dc-driver-spark-kubernetes-driver-log.txt
The succeed logs: s-bigdata-402-21809172-57074b7aa8d5269c-driver-spark-kubernetes-driver-log.txt
The sql: create table.txt merge.txt
How were you running on that build? My first guess would be that that version wasn't actually present at runtime
@RussellSpitzer I rebuilt a table with exactly the same statement(using Trino), and everything worked well. The only difference is that the old table has some data.
The failed logs: s-bigdata-402-5918f407-84557c7aa8cf66dc-driver-spark-kubernetes-driver-log.txt
The succeed logs: s-bigdata-402-21809172-57074b7aa8d5269c-driver-spark-kubernetes-driver-log.txt
The sql: create table.txt merge.txt
I run the exception sql on spark-thriftserver, and it worked. May be the old application has some problem.
this is table test_merge_002:
this is table test_merge_001:
but run this sql,appear error