apache / incubator-xtable

Apache XTable (incubating) is a cross-table converter for lakehouse table formats that facilitates interoperability across data processing systems and query engines.
https://xtable.apache.org/
Apache License 2.0
768 stars 119 forks source link

RuntimeIOException when reading iceberg target table generated from hudi source #409

Closed rahul-ghiware closed 2 months ago

rahul-ghiware commented 2 months ago

Using Spark 3.4.0, Scala 2.12, Hudi 0.14.0 and Iceberg spark runtime 1.4.2

SparkSession spark = sessionBuilder.appName("test iceberg read") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") .config("spark.sql.catalog.spark_catalog.type", "hive") .config("spark.sql.catalog.test_iceberg", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.test_iceberg.type","hadoop") .config("spark.sql.defaultCatalog","test_iceberg") .config("spark.sql.catalog.test_iceberg.warehouse","s3a://mq-de-qa/TEST_DATA/hudi-warehouse/").getOrCreate(); Dataset dataFrame = spark.read().format("iceberg").load("people"); dataFrame.printSchema(); dataFrame.show(false);


- Getting RuntimeIOException due to  `org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"` 

org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://mq-de-qa/TEST_DATA/hudi-warehouse/people/metadata/snap-8390830730587993100-1-ace15a3f-1f45-4585-ba2e-3e9b6858c6c8.avro at org.apache.iceberg.hadoop.Util.getFs(Util.java:58) at org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:56) at org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:87) at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:146) at org.apache.iceberg.BaseSnapshot.deleteManifests(BaseSnapshot.java:180) at org.apache.iceberg.BaseDistributedDataScan.findMatchingDeleteManifests(BaseDistributedDataScan.java:207) at org.apache.iceberg.BaseDistributedDataScan.doPlanFiles(BaseDistributedDataScan.java:148) at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.tasks(SparkPartitioningAwareScan.java:174) at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.taskGroups(SparkPartitioningAwareScan.java:202) at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.outputPartitioning(SparkPartitioningAwareScan.java:104) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:44) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:42) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248) at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248) at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1563) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248) at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1542) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.partitioning(V2ScanPartitioningAndOrdering.scala:42) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$1(V2ScanPartitioningAndOrdering.scala:35) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$3(V2ScanPartitioningAndOrdering.scala:38) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:37) at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:33) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:143) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:139) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:135) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168) at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165) at org.apache.spark.sql.Dataset.head(Dataset.scala:3161) at org.apache.spark.sql.Dataset.take(Dataset.scala:3382) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284) at org.apache.spark.sql.Dataset.showString(Dataset.scala:323) at org.apache.spark.sql.Dataset.show(Dataset.scala:811) at org.apache.spark.sql.Dataset.show(Dataset.scala:788)



- Same Java spark code runs fine when reading iceberg source table
- Seems like I'm missing some options to read iceberg target table. Can someone please assist me ?
the-other-tim-brown commented 2 months ago

@rahul-ghiware can you also add "spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"

rahul-ghiware commented 2 months ago

@the-other-tim-brown I did add given configuration to SparkSession as you can see in code snapshot but it is still throwing exception

dipankarmazumdar commented 2 months ago

@rahul-ghiware - Can you try setting this 'spark.sql.catalog.test_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO'. Since you are using a Hadoop catalog with Iceberg & reading/writing data in a S3/cloud storage, it could be looking for that class.

the-other-tim-brown commented 2 months ago

@the-other-tim-brown I did add given configuration to SparkSession as you can see in code snapshot but it is still throwing exception

Hi @rahul-ghiware, can you update with the latest code you tried then? you are setting s3a but not s3 in your code above.

rahul-ghiware commented 2 months ago

@rahul-ghiware - Can you try setting this 'spark.sql.catalog.test_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO'. Since you are using a Hadoop catalog with Iceberg & reading/writing data in a S3/cloud storage, it could be looking for that class.

Adding above mentioned setting did not help. Still getting same exception. Latest Java spark code

SparkSession.Builder sessionBuilder = SparkSession.builder().master("local[1]")
                .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-1.amazonaws.com");
        sessionBuilder = sessionBuilder.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        SparkSession spark = sessionBuilder.appName("test iceberg read")
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
                .config("spark.sql.catalog.spark_catalog.type", "hive")
                .config("spark.sql.catalog.test_iceberg", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.test_iceberg.type","hadoop")
                .config("spark.sql.defaultCatalog","test_iceberg")
                .config("spark.sql.catalog.test_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
                .config("spark.sql.catalog.test_iceberg.warehouse","s3a://mq-de-qa/TEST_DATA/hudi-warehouse/").getOrCreate();
        Dataset<Row> dataFrame = spark.read().format("iceberg").load("people");
        dataFrame.printSchema();
        dataFrame.show(false);
rahul-ghiware commented 2 months ago

@rahul-ghiware - Can you try setting this 'spark.sql.catalog.test_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO'. Since you are using a Hadoop catalog with Iceberg & reading/writing data in a S3/cloud storage, it could be looking for that class.

Above setting not needed to read iceberg original source table. I'm able to load iceberg original source table with HadoopFileIO

the-other-tim-brown commented 2 months ago

@rahul-ghiware can you also add "spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"

@rahul-ghiware I think you missed my message. note that this is s3 filesystem and not s3a. Let me know if adding this in addition to s3a configuration helps.

rahul-ghiware commented 2 months ago

@rahul-ghiware can you also add "spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"

@rahul-ghiware I think you missed my message. note that this is s3 filesystem and not s3a. Let me know if adding this in addition to s3a configuration helps.

@the-other-tim-brown I'm using "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem". Hence S3 location given with s3a

the-other-tim-brown commented 2 months ago

@rahul-ghiware please post the updated stack trace and configs please. You reference s3 in the yaml mentioned at the top of this issue and furthermore the stacktrace is org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3:// so it looks like you need to define how hadoop will handle the s3 filesystem.

Can you humor me and just try it? If not, I am not sure how I can help you.

rahul-ghiware commented 2 months ago

@rahul-ghiware please post the updated stack trace and configs please. You reference s3 in the yaml mentioned at the top of this issue and furthermore the stacktrace is org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3:// so it looks like you need to define how hadoop will handle the s3 filesystem.

Can you humor me and just try it? If not, I am not sure how I can help you.

Below mentioned code works fine for original iceberg source table without any issue. It is throwing exception only for iceberg target table.

SparkSession.Builder sessionBuilder = SparkSession.builder().master("local[1]")
                .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-1.amazonaws.com");
        sessionBuilder = sessionBuilder.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        SparkSession spark = sessionBuilder.appName("test iceberg read")
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
                .config("spark.sql.catalog.spark_catalog.type", "hive")
                .config("spark.sql.catalog.test_iceberg", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.test_iceberg.type","hadoop")
                .config("spark.sql.defaultCatalog","test_iceberg")
                .config("spark.sql.catalog.test_iceberg.warehouse","s3a://mq-de-qa/TEST_DATA/iceberg-warehouse/")
                .getOrCreate();
        Dataset<Row> dataFrame = spark.read().format("iceberg").load("people");
        dataFrame.printSchema();
        dataFrame.show(false);

Above mentioned code throws exception when trying to read iceberg target table. Code is able to print table schema but its failing when trying to show records (trying to access .avro file from metadata)

org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://mq-de-qa/TEST_DATA/hudi-warehouse/people/metadata/snap-8390830730587993100-1-ace15a3f-1f45-4585-ba2e-3e9b6858c6c8.avro
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
        at org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:56)
        at org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:87)
        at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:146)
        at org.apache.iceberg.BaseSnapshot.deleteManifests(BaseSnapshot.java:180)
        at org.apache.iceberg.BaseDistributedDataScan.findMatchingDeleteManifests(BaseDistributedDataScan.java:207)
        at org.apache.iceberg.BaseDistributedDataScan.doPlanFiles(BaseDistributedDataScan.java:148)
        at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139)
        at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.tasks(SparkPartitioningAwareScan.java:174)
        at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.taskGroups(SparkPartitioningAwareScan.java:202)
        at org.apache.iceberg.spark.source.SparkPartitioningAwareScan.outputPartitioning(SparkPartitioningAwareScan.java:104)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:44)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$$anonfun$partitioning$1.applyOrElse(V2ScanPartitioningAndOrdering.scala:42)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
        at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:69)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
        at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1563)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
        at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1542)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.partitioning(V2ScanPartitioningAndOrdering.scala:42)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$1(V2ScanPartitioningAndOrdering.scala:35)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.$anonfun$apply$3(V2ScanPartitioningAndOrdering.scala:38)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:37)
        at org.apache.spark.sql.execution.datasources.v2.V2ScanPartitioningAndOrdering$.apply(V2ScanPartitioningAndOrdering.scala:33)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:143)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:139)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:135)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:153)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:168)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:221)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:266)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:235)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:112)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:811)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:788)
        at com.marqeta.ddp.pipeline.platform.SparkSnowflakeConnectorTest.testIcebergRead(SparkSnowflakeConnectorTest.java:294)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
        at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
        at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
        at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
        at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
        at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
        at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
        at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
        at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
        at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
        at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
        at org.apache.maven.surefire.junitplatform.LazyLauncher.execute(LazyLauncher.java:55)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:223)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:175)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:139)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:456)
        at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:169)
        at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:595)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:581)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
the-other-tim-brown commented 2 months ago
image

@rahul-ghiware you need to specify a filesystem for scheme "s3" like the stacktrace mentions.

rahul-ghiware commented 2 months ago
image

@rahul-ghiware you need to specify a filesystem for scheme "s3" like the stacktrace mentions.

@the-other-tim-brown Given code works fine with original iceberg source table (i.e. s3a://mq-de-qa/TEST_DATA/iceberg-warehouse/people/) Not sure if it is only problem with iceberg target table (generated from hudi source using XTable)

the-other-tim-brown commented 2 months ago

@rahul-ghiware you specify the base path as s3 and not s3a so why do you expect the output paths to have s3a?

image

You can use the code I suggested to read from a s3:// path with the s3a file system

rahul-ghiware commented 2 months ago

@rahul-ghiware - Can you try setting this 'spark.sql.catalog.test_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO'. Since you are using a Hadoop catalog with Iceberg & reading/writing data in a S3/cloud storage, it could be looking for that class.

Thank you @dipankarmazumdar and @the-other-tim-brown . I was able to read iceberg target table after adding following config to the code.

sessionBuilder = sessionBuilder.config("spark.sql.catalog.test_iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");