apache / polaris

Apache Polaris, the interoperable, open source catalog for Apache Iceberg
https://polaris.apache.org/
Apache License 2.0
1.12k stars 120 forks source link

[FEATURE REQUEST] Add Support for S3A prefix #214

Open TheerachotTle opened 2 months ago

TheerachotTle commented 2 months ago

Is your feature request related to a problem? Please describe.

I have set the allowed location of the created catalog to S3 storage type using s3:// prefix. When I run remove_orphan_files procedure in Spark, it results in an error message: No FileSystem for scheme "s3". To solve this problem, I attempted to create the catalog with the s3a:// prefix, but I received a 400 Bad Request error with the message: Location prefix not allowed. Here's my spark configuration

spark = SparkSession.builder \
            .config("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0,org.apache.hadoop:hadoop-common:3.4.0") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation', 'true') \
            .config("spark.sql.catalog.polaris.uri", POLARIS_URI) \
            .config("spark.sql.catalog.polaris.type", "rest") \
            .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.polaris.warehouse", POLARIS_CATALOG_NAME) \
            .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config('spark.sql.catalog.polaris.credential', POLARIS_CREDENTIALS) \
            .config('spark.sql.catalog.polaris.scope', POLARIS_SCOPE) \
            .config('spark.sql.catalog.polaris.token-refresh-enabled', 'true') \
            .getOrCreate()

Describe the solution you'd like

Probably add the s3a:// prefix as an alternative for the S3 storage type.

Describe alternatives you've considered

No response

Additional context

No response

flyrain commented 2 months ago

Do other DMLs(e.g., insert, delete)work? Can you share the stack of the error?

flyrain commented 2 months ago

Can you remove this config and try again?

            .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") 
TheerachotTle commented 2 months ago

Yes, the other DML commands work as expected, and I also removed the config above, but it still results in an error. This is the code I ran spark.sql("""CALL polaris.system.remove_orphan_files(table => 'polaris.namespace.table')""").show() Here's the error.

Py4JJavaError: An error occurred while calling o48.sql.
: java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:386)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:311)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:296)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:247)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:130)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:223)
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:185)
at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
at org.apache.iceberg.spark.procedures.BaseProcedure.withIcebergTable(BaseProcedure.java:96)
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.call(RemoveOrphanFilesProcedure.java:139)
at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:356)
... 55 more
eric-maynard commented 2 months ago

@TheerachotTle I think the issue is this config:

            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \

If you refer to the quickstart guide, it gives an example of Spark configs that can be used to connect to an Iceberg REST catalog.

Having said that, I think s3a support is a reasonable feature request

mayankvadariya commented 2 months ago

the other DML commands work as expected

if this is specific to remove_orphan_files, lets change the title to reflect it.

TheerachotTle commented 2 months ago

If you refer to the quickstart guide, it gives an example of Spark configs that can be used to connect to an Iceberg REST catalog.

Removing the config and it still doesn't work. From my understanding, the remove_orphan_files operation involves file listing to determine which files should be removed, and the Spark procedure uses Hadoop FS to perform listing operations.

if this is specific to remove_orphan_files, lets change the title to reflect it.

I have tried this procedure with other Iceberg catalogs, and it has the same problem when using the s3:// prefix. I'm not sure if the title should be changed to be about this procedure?

flyrain commented 2 months ago

the Spark procedure uses Hadoop FS to perform listing operations.

Yup, I'm guessing the failure is triggered due to procedure is using the Spark Hadoop FS while other DML commands use the FileIO from the iceberg catalog. It more likely a config thing than a bug, but I need to take a close look. Would you share a way to to reproduce it? for example, the spark version and config, and the command used to call the procedure.

TheerachotTle commented 2 months ago

I'm using spark 3.5.0 create catalog with POST request

{"name": "testcatalog", "type": "INTERNAL", "properties": {
        "default-base-location": "s3://bucket/folder/"
    },"storageConfigInfo": {
        "roleArn": "arn:aws:iam::xxxxxxxxx:role/demo-polaris",
        "storageType": "S3",
        "allowedLocations": [
            "s3://bucket/folder"
        ]
    } }

config of spark

spark = SparkSession.builder \
            .config("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0,org.apache.hadoop:hadoop-common:3.4.0") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation', 'true') \
            .config("spark.sql.catalog.polaris.uri", POLARIS_URI) \
            .config("spark.sql.catalog.polaris.type", "rest") \
            .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.polaris.warehouse", POLARIS_CATALOG_NAME) \
            .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config('spark.sql.catalog.polaris.credential', POLARIS_CREDENTIALS) \
            .config('spark.sql.catalog.polaris.scope', POLARIS_SCOPE) \
            .config('spark.sql.catalog.polaris.token-refresh-enabled', 'true') \
            .getOrCreate()

code to reproduce

spark.sql("USE polaris")
spark.sql("USE NAMESPACE namespace1")
spark.sql("""CREATE TABLE IF NOT EXISTS table1 (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg
LOCATION "s3://bucket/folder/namespace1/table1"
""")
spark.sql("INSERT INTO table1 VALUES (1,'test')")
spark.sql("""CALL polaris.system.remove_orphan_files(
  table => 'polaris.namespace1.table1'
  )
""").show()
flyrain commented 2 months ago

This is an Iceberg issue instead of a Polaris one. To summarize, DML commands and procedures usually use FileIO object provided by the catalog for read and write files. However, the procedure RemoveOrphanFile uses the Spark configuration to get the FileSystem object for listing, which is a Hadoop s3a File System. It couldn't recognize the s3://. Solutions would be

  1. Using catalog FileIO instead of the File System from Spark config. ResolvingFileIO is the default one used by REST catalog, which delegates to S3FileIO in this case, it supports listPrefix. This requires code change in the procedure.
  2. Using aws s3 client instead of Hadoop s3a client in Spark, I guess this only needs a config change, I'm not familiar with that though. Recommend to check with the Iceberg community.
anuragmantri commented 2 months ago

Here is another old thread on Iceberg slack about this issue

https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1636652647457600?thread_ts=1636639133.442800&cid=C025PH0G1D4

RemoveOrphanFiles is probably the only procedure that requires HadoopFileSystem in Iceberg, because it has to scan the entire storage and Iceberg’s FileIO interface as of today does not have a list-flavor API

Since listPrefix is now available, maybe we can update the procedure to use FileIO. I will create an issue in Iceberg.

anuragmantri commented 2 months ago

Oh great! There is already a PR for this. https://github.com/apache/iceberg/pull/7914

flyrain commented 2 months ago

Thanks @anuragmantri for chiming in. It'd be ideal to use Iceberg FileIO in removeOrphanFile, so that we don't have to config Spark file system differently, which is a duplication to avoid. I will take a look at the Iceberg PR.

We will still need a workaround at this moment though, as the Iceberg change and release will take a while. You can customize your iceberg lib of course, but not every user is able to do that. @dennishuo mentioned a workaround here. It doesn't work for me locally, but worth to try. cc @TheerachotTle

spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A
yassan commented 2 months ago

How about replacing s3:// with s3a:// and configuring spark.sql.catalog.polaris.io-imp to use org.apache.iceberg.io.ResolvingFileIO ?

TheerachotTle commented 2 months ago

How about replacing s3:// with s3a://

Polaris doesn't allow me to create a catalog with this prefix.

spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A

With this config, I can use remove_orphan_files without any error.

flyrain commented 1 month ago

Let's document it before it is fixed in the Iceberg side, actually it should be documented in Iceberg side.