apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.25k stars 2.17k forks source link

How to set Spark conf to use Parquet and Iceberg tables using glue catalog without catalog name(spark_catalog)? #7748

Open ryu3065 opened 1 year ago

ryu3065 commented 1 year ago

Query engine

Using

AWS EMR 6.10.0 
Spark 3.3.1 
Iceberg 1.1.0-amzn-0
AWS Glue catalog

Spark-defaults

'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions',
  'spark.sql.catalog.spark_catalog': 'org.apache.iceberg.spark.SparkCatalog',
  'spark.sql.catalog.spark_catalog.catalog-impl': 'org.apache.iceberg.aws.glue.GlueCatalog',
  'spark.sql.catalog.spark_catalog.warehouse': 's3://{MY_BUCKET}',
  'spark.sql.catalog.spark_catalog.io-impl': 'org.apache.iceberg.aws.s3.S3FileIO',

  'spark.sql.catalog.spark_catalog.lock-impl': 'org.apache.iceberg.aws.dynamodb.DynamoDbLockManager',
  'spark.sql.catalog.spark_catalog.lock.table': '{MY_LOCKTABLE}',

Question

The iceberg table works normally with the configurations I set, but the parquet table that was used previously does not work with the error message below.

Iceberg table spark.sql("select * from db.iceberg_table ").show() it works well.

but,

Parquet table spark.sql("select * from db.parquet_table ").show() occur error.

_org.apache.iceberg.exceptions.ValidationException: Input Glue table is not an iceberg table: spark_catalog.db.parquet_table (type=null)_

So, I tried to set spark conf _spark.conf.set("spark.sql.catalog.spark_catalog.type", "hive"_ (hive/glue/hadoop or something) but same.

org.apache.iceberg.exceptions.ValidationException: Input Glue table is not an iceberg table: spark_catalog.db.parquet_table (type=null)
  at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
  at org.apache.iceberg.aws.glue.GlueToIcebergConverter.validateTable(GlueToIcebergConverter.java:48)
  at org.apache.iceberg.aws.glue.GlueTableOperations.doRefresh(GlueTableOperations.java:127)
  at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
  at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
  at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:44)
  at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
  at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
  at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
  at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
  at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
  at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
  at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:166)
  at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:608)
  at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:145)
  at org.apache.iceberg.spark.SparkSessionCatalog.loadTable(SparkSessionCatalog.java:134)
  at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:311)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1198)
  at scala.Option.orElse(Option.scala:447)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$1(Analyzer.scala:1197)
  at scala.Option.orElse(Option.scala:447)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1189)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1060)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$13.applyOrElse(Analyzer.scala:1024)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:179)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1282)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1281)
  at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:227)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1282)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1281)
  at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:227)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1282)
  at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1281)
  at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1625)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1024)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:983)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:215)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:91)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:212)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:284)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:327)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:284)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:274)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:274)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:188)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
  at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:79)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:214)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:554)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:214)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:213)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:79)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:69)
  at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
  at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
  ... 44 elided

How to set Spark conf to use Parquet and Iceberg tables using glue catalog without catalog name(spark_catalog)?

akshayakp97 commented 1 year ago

Could you try using SparkSessionCatalog instead of SparkCatalog? in your config - "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",

ryu3065 commented 1 year ago

Could you try using SparkSessionCatalog instead of SparkCatalog? in your config - "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",

@akshayakp97 Hi! Although I have set SparkSessionCatalog, it is not working my parquet table. but Iceberg tables works well.

spark.conf.get("spark.sql.catalog.spark_catalog")
res1: String = org.apache.iceberg.spark.SparkSessionCatalog
github-actions[bot] commented 10 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

andreaschiappacasse commented 4 months ago

We are incurring in the same problem, @ryu3065 have you managed to find a solution? We would like to run a MERGE query that reads from Parquet tables and writes on an Iceberg table - but we didn't manage to make it work yet.

sivakanthavel-tigeranalytics commented 4 months ago

Hello,

I am using org.apache.iceberg.spark.SparkSessionCatalog instead of SparkCatalog. I am able to create both Iceberg and non-Iceberg tables on the Glue catalog. However, when I try to execute SHOW TABLE EXTENDED db_name LIKE (table_name) on an Iceberg table, it throws this error: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table. StorageDescriptor#InputFormat cannot be null for table:

I am using dbt Spark to load Iceberg tables on the Glue catalog. I don't have control over the SQL command which dbt generates to check table existence. Because of the error I am getting, I am unable to proceed further.

Any help on this issue would be helpful.