apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.51k stars 2.25k forks source link

DeleteOrphanFilesSparkAction.listDirRecursively - No FileSystem for scheme "s3" #10539

Open raphaelauv opened 5 months ago

raphaelauv commented 5 months ago

Apache Iceberg version

1.5.2 (latest release)

Query engine

Spark

Please describe the bug 🐞

when I do

spark.sql("""CALL rest.system.remove_orphan_files(table => 'default.taxi_dataset')""").show(
        truncate=False)

if fail with

 py4j.protocol.Py4JJavaError: An error occurred while calling o59.sql.
 : java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
       at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:386)
       at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:311)
       at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:296)
       at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:247)
       at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
       at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
       at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:130)
       at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:223)
       at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:185)
       at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
       at org.apache.iceberg.spark.procedures.BaseProcedure.withIcebergTable(BaseProcedure.java:96)
       at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.call(RemoveOrphanFilesProcedure.java:139)
       at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
       at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
       at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
       at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
       at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
       at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
       at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
       at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
       at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
       at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
       at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
       at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
       at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
       at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
       at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
       at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
       at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
       at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
       at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
       at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
       at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
       at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
       at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
       at java.base/java.lang.reflect.Method.invoke(Unknown Source)
       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
       at py4j.Gateway.invoke(Gateway.java:282)
       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
       at py4j.commands.CallCommand.execute(CallCommand.java:79)
       at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
       at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
       at java.base/java.lang.Thread.run(Unknown Source)
 Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
       at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
       at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
       at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
       at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
       at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
       at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
       at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
       at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:356)
       ... 55 more

but I do not have the problem with

    spark.sql("""CALL rest.system.rewrite_data_files(table => 'default.taxi_dataset')""").show(truncate=False)

my conf is

            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
            .config("spark.hadoop.fs.s3a.endpoint.region", "eu-west-3") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.rest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
            .config("spark.sql.catalog.rest.uri", "http://iceberg_rest:8181") \
            .config("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000") \
            .config("spark.sql.catalog.rest.s3.path-style-access", "true") \
            .config("spark.sql.catalog.rest.s3.endpoint.region", "eu-west-3") \
            .config("spark.sql.catalog.rest.default-namespace", "default") \
            .config("spark.sql.catalog.rest.warehouse", "s3://test-bucket/") \
            .config("spark.sql.defaultCatalog", "rest")
raphaelauv commented 5 months ago

look like it's related to https://github.com/apache/iceberg/pull/7914