apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.36k stars 936 forks source link

SparkGenericCatalog not support Iceberg #2292

Open melin opened 11 months ago

melin commented 11 months ago

Search before asking

Paimon version

0.5.0-incubating

Compute Engine

        val spark = SparkSession.builder()
          .master("local")
          .enableHiveSupport()
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

          .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
          .config("spark.sql.catalog.hudi_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")

          .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
          .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
          .config("spark.sql.catalog.iceberg_catalog.type", "hive")
          .config("spark.sql.catalog.iceberg_catalog.uri", "thrift://cdh2:9083")

          .config("spark.sql.catalog.hive_metastore", "org.apache.paimon.spark.SparkGenericCatalog")
          .getOrCreate()

        println("hello hudi")
        spark.sql("select * from hive_metastore.bigdata.hudi_sample_1").show
        println("hello paimon")
        spark.sql("select * from hive_metastore.bigdata.paimon_sample").show
        println("hello iceberg")
        spark.sql("select * from hive_metastore.bigdata.iceberg_sample_1").show

Minimal reproduce step

hello hudi
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|data|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+

hello paimon
+---+---+
|  k|  v|
+---+---+
+---+---+

hello iceberg
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
    at org.example.spark.datasource.hudi.PaimonTest1$.main(PaimonTest1.scala:22)
    at org.example.spark.datasource.hudi.PaimonTest1.main(PaimonTest1.scala)
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.sql.AnalysisException: iceberg is not a valid Spark SQL Data Source.
    at org.sparkproject.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at org.sparkproject.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at org.sparkproject.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at org.sparkproject.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at org.sparkproject.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getCachedPlan(SessionCatalog.scala:209)
    at org.apache.spark.sql.execution.datasources.FindDataSourceTable.org$apache$spark$sql$execution$datasources$FindDataSourceTable$$readDataSourceTable(DataSourceStrategy.scala:245)
    at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:285)
    at org.apache.spark.sql.execution.datasources.FindDataSourceTable$$anonfun$apply$2.applyOrElse(DataSourceStrategy.scala:275)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
    at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.mapChildren(basicLogicalOperators.scala:1631)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
    at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:69)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:31)
    at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:275)
    at org.apache.spark.sql.execution.datasources.FindDataSourceTable.apply(DataSourceStrategy.scala:239)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
    at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
    at org.example.spark.datasource.hudi.PaimonTest1$$anon$1.run(PaimonTest1.scala:45)
    at org.example.spark.datasource.hudi.PaimonTest1$$anon$1.run(PaimonTest1.scala:22)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
    ... 2 more
Caused by: org.apache.spark.sql.AnalysisException: iceberg is not a valid Spark SQL Data Source.
    at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidDataSourceError(QueryCompilationErrors.scala:1457)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:420)
    at org.apache.spark.sql.execution.datasources.FindDataSourceTable.$anonfun$readDataSourceTable$1(DataSourceStrategy.scala:257)
    at org.sparkproject.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
    at org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 84 more

添加spark-iceberg-bundle image

What doesn't meet your expectations?

Catalog hive_metastore supports access to different types of paimon, hudi, iceberg table

Anything else?

No response

Are you willing to submit a PR?

melin commented 11 months ago

有 SparkGenericCatalog 创建的iceberg 表有问题:bigdata.iceberg_sample_1 bigdata.iceberg_sample_2 是iceberg catalog 创建的表,是正确的。 image

@JingsongLi

Zouxxyy commented 11 months ago

@melin How can you use SparkGenericCatalog to create iceberg table?

melin commented 11 months ago

@melin How can you use SparkGenericCatalog to create iceberg table?

image

创建hudi table 是正常的

Zouxxyy commented 11 months ago

@melin How can you use SparkGenericCatalog to create iceberg table?

image

创建hudi table 是正常的

hudi 表的创建不强依赖catalog

melin commented 10 months ago

@melin How can you use SparkGenericCatalog to create iceberg table?

image

创建hudi table 是正常的

hudi 表的创建不强依赖catalog

能够解决这个问题吗?