apache / kyuubi

Apache Kyuubi is a distributed and multi-tenant gateway to provide serverless SQL on data warehouses and lakehouses.
https://kyuubi.apache.org/
Apache License 2.0
2.09k stars 913 forks source link

[Bug] [authZ]can't update iceberg table #3515

Closed lordk911 closed 2 years ago

lordk911 commented 2 years ago

Code of Conduct

Search before asking

Describe the bug

env: spark:3.2.1 \ authz:master

1、complie authz from master branch clean package -pl :kyuubi-spark-authz_2.12 -DskipTests -Dspark.version=3.2.1 -Dranger.version=1.2.0 2、put kyuubi-spark-authz_2.12-1.7.0-SNAPSHOT.jar to $SPARK_HOME/jars 3、config spark spark.sql.extensions org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.kyuubi.sql.KyuubiSparkSQLExtension

spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_catalog.type hive

spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog spark.sql.catalog.spark_catalog.type hive

4、use spark-shell 5、spark.sql("create table test.iceberg919(id bigint, name string) USING iceberg") 6、spark.sql("INSERT INTO test.iceberg919 VALUES (1, 'a'), (2, 'b'), (3, 'c')") 7、spark.sql("select * from test.iceberg919").show

+---+----+                                                                      
| id|name|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+

8、

spark.sql("update test.iceberg919 set name='aa' where id=1")
spark.sql("update iceberg_catalog.test.iceberg919 set name='aa' where id=1")
spark.sql("update spark_catalog.test.iceberg919 set name='aa' where id=1")

all got error with : is not an Iceberg table

Affects Version(s)

master

Kyuubi Server Log Output

spark.sql("update test.iceberg919 set name='aa' where id=1")
org.apache.spark.sql.AnalysisException: Project [id#25L, name#26]
+- RowFilterAndDataMaskingMarker
   +- RelationV2[id#25L, name#26] spark_catalog.test.iceberg919
 is not an Iceberg table
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$$anonfun$apply$1.applyOrElse(RewriteUpdateTable.scala:71)
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$$anonfun$apply$1.applyOrElse(RewriteUpdateTable.scala:53)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$.apply(RewriteUpdateTable.scala:53)
  at org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable$.apply(RewriteUpdateTable.scala:51)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:222)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:218)
  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:167)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:218)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:182)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:203)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
  ... 47 elided

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

lordk911 commented 2 years ago

@bowenliang123

bowenliang123 commented 2 years ago

Thanks for reporting. Let me have a check.

minyk commented 2 years ago

in our qa env, similar update explain like this:

EXPLAIN EXTENDED UPDATE spkui05_1_s3a_mig SET field_one = 'one' WHERE field_three = 'two';

== Parsed Logical Plan ==
'UpdateIcebergTable [assignment('field_one, one)], ('field_three = two)
+- 'UnresolvedRelation [spkui05_1_s3a_mig], [], false

== Analyzed Logical Plan ==
UpdateIcebergTable [assignment(banana_id#779, banana_id#779), assignment(field_one#780, one), assignment(field_two#781, field_two#781), assignment(field_three#782, field_three#782), assignment(date_created#783, date_created#783)], (field_three#782 = two)
:- RelationV2[banana_id#779, field_one#780, field_two#781, field_three#782, date_created#783] spark_catalog.spkui01.spkui05_1_s3a_mig
+- ReplaceData RelationV2[banana_id#779, field_one#780, field_two#781, field_three#782, date_created#783] spark_catalog.spkui01.spkui05_1_s3a_mig
   +- Project [if ((field_three#782 = two)) banana_id#779 else banana_id#779 AS banana_id#788, if ((field_three#782 = two)) one else field_one#780 AS field_one#789, if ((field_three#782 = two)) field_two#781 else field_two#781 AS field_two#790, if ((field_three#782 = two)) field_three#782 else field_three#7...

maybe RowFilterAndDataMaskingMarker replaces UpdateIcebergTable is the problem.

@lordk911 can you share your explain statement?

bowenliang123 commented 2 years ago

@minyk explain the udpate sql will causes failure. And the exception comes from org.apache.spark.sql.catalyst.analysis.RewriteUpdateTable from Iceberg's extension. RewriteUpdateTable failed to recognize RowFilterAndDataMaskingMarker as instance of DataSourceV2Relation since in UpdateIcebergTable method the execution plan is rewritten.

yaooqinn commented 2 years ago
case class UpdateIcebergTable(
    table: LogicalPlan,
    assignments: Seq[Assignment],
    condition: Option[Expression],
    rewritePlan: Option[LogicalPlan] = None) extends RowLevelCommand {

  lazy val aligned: Boolean = AssignmentUtils.aligned(table, assignments)

  override def children: Seq[LogicalPlan] = if (rewritePlan.isDefined) {
    table :: rewritePlan.get :: Nil
  } else {
    table :: Nil
  }

The spark's v2 command would not have the table filed as a child, so we will not apply our marker on it and it's fine. but the iceberg custom command adds it to the children function, so we apply our marker upon it and it fails.

bowenliang123 commented 2 years ago

It should be fixed now on the master branch. Please have a check if you have some time. @lordk911