apache / kyuubi

Apache Kyuubi is a distributed and multi-tenant gateway to provide serverless SQL on data warehouses and lakehouses.
https://kyuubi.apache.org/
Apache License 2.0
2.09k stars 913 forks source link

[Bug] Issue when applying Masking policies on Iceberg Tables #4202

Closed praveenkumarb1207 closed 1 year ago

praveenkumarb1207 commented 1 year ago

Code of Conduct

Search before asking

Describe the bug

Following the instructions in https://github.com/apache/kyuubi/blob/master/docs/security/authorization/spark/install.md , I have installed Kyuubi plugin by copying all the jars and required configuration files as mentioned in the link to $SPARK_HOME .

Following is the version Information :

Ranger Version - 2.3.0 Spark Version - Spark 3.2.3 with Hadoop 3.3.4 Apache Hive - 3.1.2

I have created an Iceberg table in Hive using spark .

spark-shell command :

spark-shell --packages "org.apache.spark:spark-hive_2.12:3.2.3,org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0" \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=$PWD/warehouse --proxy-user pb1207  

Code :

import spark.implicits._

  val df = Seq(

    (1, "America"),

    (2, "India"),

    (3, "London")

  ).toDF("id", "country")

df.write.format("iceberg").mode("overwrite").option("path","s3a:/bucket/test/test_iceberg.parquet").saveAsTable("test.test_iceberg_table")

Iceberg table got successfully created in Hive .

code :

spark.sql("show create table test.test_iceberg_table").show(false)

output :

image

Selecting table in spark :

code :

spark.sql("select * from test.test_iceberg_table").show(false)

output :

image

Created a Access policy in Ranger on the Iceberg table and it was working as expected .

Ranger Policy :

image

image

Output from spark :

image

I have given the access to table in Ranger and created a Row level filtering policy in Ranger on the Iceberg table and it was working as expected .

Ranger Policy :

image

image

Output from spark :

image

But when I create a masking policy , I am facing the below issue.

Ranger policy :

image

Output :

image

Error :

org.apache.spark.sql.AnalysisException: Resolved attribute(s) country#114 missing from id#113,country#115 in operator !Project [id#113, country#114]. Attribute(s) with the same name appear in the operation: country. Please check if the right attribute(s) are used.;
!Project [id#113, country#114]
+- SubqueryAlias spark_catalog.test.test_iceberg_table
   +- Project [id#113, null AS country#115]
      +- Filter (id#113 > 1)
         +- RowFilterAndDataMaskingMarker
            +- RelationV2[id#113, country#114] spark_catalog.test.test_iceberg_table

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:52)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis$(CheckAnalysis.scala:51)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:182)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:474)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:97)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:97)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:182)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:205)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:75)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:75)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:65)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
  ... 54 elided

Can you please look into this issue ?

Affects Version(s)

master

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

bowenliang123 commented 1 year ago

Are you compiling and using the Authz module from the master branch?

praveenkumarb1207 commented 1 year ago

Yes , I am using the master branch .

bowenliang123 commented 1 year ago

Okay, more investigation is required.

For row filter in iceberg, it's covered in IcebergCatalogRangerSparkExtensionSuite using org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions. https://github.com/apache/kyuubi/blob/master/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala#L174 Have a look at it if you have time to do more testing. Feel free to show any detail or discovery.

praveenkumarb1207 commented 1 year ago

Hi @bowenliang123 ,

Just to be clear , We are facing issue only with the Masking on Iceberg tables . Row level filtering is working as expected .

bowenliang123 commented 1 year ago

Okay, well noticed. Also cc @yaooqinn