apache / incubator-xtable

Apache XTable (incubating) is a cross-table converter for lakehouse table formats that facilitates interoperability across data processing systems and query engines.
https://xtable.apache.org/
Apache License 2.0
919 stars 147 forks source link

Polaris catalog sync failing with Delegate access to table with user-specified write location is temporarily not supported. #545

Closed sagarlakshmipathy closed 1 month ago

sagarlakshmipathy commented 2 months ago

Search before asking

Please describe the bug 🐞

I ran into an issue while using Snowflake's polaris catalog. Documenting here.

java -cp /Users/sagarl/Downloads/iceberg-spark-runtime-3.4_2.12-1.4.1.jar:/Users/sagarl/latest/incubator-xtable/xtable-utilities/target/xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:/Users/sagarl/Downloads/bundle-2.20.160.jar:/Users/sagarl/Downloads/url-connection-client-2.20.160.jar org.apache.xtable.utilities.RunSync --datasetConfig config.yaml --icebergCatalogConfig catalog.yaml

Error

2024-09-20 22:55:30 INFO  org.apache.iceberg.RemoveSnapshots:328 - Cleaning up expired files (local, incremental)
2024-09-20 22:55:31 ERROR org.apache.xtable.spi.sync.TableFormatSync:78 - Failed to sync snapshot
org.apache.iceberg.exceptions.ForbiddenException: Forbidden: Delegate access to table with user-specified write location is temporarily not supported.
    at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:157) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:88) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:71) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:183) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:292) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:226) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.HTTPClient.post(HTTPClient.java:337) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.RESTClient.post(RESTClient.java:112) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.rest.RESTTableOperations.commit(RESTTableOperations.java:152) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$3(BaseTransaction.java:416) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:412) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:307) ~[iceberg-spark-runtime-3.4_2.12-1.4.1.jar:?]
    at org.apache.xtable.iceberg.IcebergConversionTarget.completeSync(IcebergConversionTarget.java:221) ~[xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:0.2.0-SNAPSHOT]
    at org.apache.xtable.spi.sync.TableFormatSync.getSyncResult(TableFormatSync.java:165) ~[xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:0.2.0-SNAPSHOT]
    at org.apache.xtable.spi.sync.TableFormatSync.syncSnapshot(TableFormatSync.java:70) [xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:0.2.0-SNAPSHOT]
    at org.apache.xtable.conversion.ConversionController.syncSnapshot(ConversionController.java:182) [xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:0.2.0-SNAPSHOT]
    at org.apache.xtable.conversion.ConversionController.sync(ConversionController.java:118) [xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:0.2.0-SNAPSHOT]
    at org.apache.xtable.utilities.RunSync.main(RunSync.java:191) [xtable-utilities-0.2.0-SNAPSHOT-bundled.jar:0.2.0-SNAPSHOT]

The sync did not completely happen at this point meaning the table gets created in target format in the catalog, but doesn't have data in it.

config.yaml

sourceFormat: HUDI
targetFormats:
  - ICEBERG
datasets:
  -
    tableBasePath: s3://xtable-demo-bucket/spark_demo/people
    tableName: people
    partitionSpec: city:VALUE
    namespace: spark_demo

catalog.yaml

catalogImpl: org.apache.iceberg.rest.RESTCatalog
catalogName: iceberg_catalog
catalogOptions:
  io-impl: org.apache.iceberg.aws.s3.S3FileIO
  warehouse: iceberg_catalog
  uri: https://<polaris-id>.snowflakecomputing.com/polaris/api/catalog
  credential: <client-id>:<client-secret>
  header.X-Iceberg-Access-Delegation: vended-credentials
  scope: PRINCIPAL_ROLE:ALL
  client.region: us-west-2

I could access the table using spark-shell using command, so the table is very much created. I could also create a table directly from the spark shell if needed. So I can say spark writes work directly from outside SF, there is something wrong with the catalog sync for an existing table.

pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.1,software.amazon.awssdk:bundle:2.20.160,software.amazon.awssdk:url-connection-client:2.20.160 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.defaultCatalog=polaris \
--conf spark.sql.catalog.polaris=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.polaris.type=rest \
--conf spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation=vended-credentials \
--conf spark.sql.catalog.polaris.uri=https://<polaris-id>.snowflakecomputing.com/polaris/api/catalog \
--conf spark.sql.catalog.polaris.credential=<client-id>:<client-secret> \
--conf spark.sql.catalog.polaris.warehouse=iceberg_catalog \
--conf spark.sql.catalog.polaris.scope=PRINCIPAL_ROLE:my_spark_admin_role \
--conf spark.sql.catalog.polaris.client.region=us-west-2
>>> spark.sql("USE spark_demo")
DataFrame[]
>>> spark.sql("SHOW TABLES").show()
+----------+----------+-----------+                                             
| namespace| tableName|isTemporary|
+----------+----------+-----------+
|spark_demo|    people|      false|
|spark_demo|test_table|      false|
+----------+----------+-----------+

>>> spark.sql("SELECT * FROM people").show()
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+----+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|name|age|city|create_ts|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+----+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+----+---------+

>>> 

directly creating table using spark

spark.sql("USE spark_demo")

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema for the records
schema = StructType([
   StructField("id", IntegerType(), True),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

# Create a DataFrame with the records
records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]

df = spark.createDataFrame(records, schema)

spark.sql("""
CREATE TABLE people_via_spark (
    id INT,
    name STRING,
    age INT,
    city STRING,
    create_ts STRING
) USING iceberg
""")

df.writeTo("people_via_spark").append()
>>> spark.sql("SELECT * FROM people_via_spark").show()                          
+---+-------+---+----+-------------------+                                      
| id|   name|age|city|          create_ts|
+---+-------+---+----+-------------------+
|  1|   John| 25| NYC|2023-09-28 00:00:00|
|  2|  Emily| 30| SFO|2023-09-28 00:00:00|
|  3|Michael| 35| ORD|2023-09-28 00:00:00|
|  4| Andrew| 40| NYC|2023-10-28 00:00:00|
|  5|    Bob| 28| SEA|2023-09-23 00:00:00|
|  6|Charlie| 31| DFW|2023-08-29 00:00:00|
+---+-------+---+----+-------------------+

Are you willing to submit PR?

Code of Conduct

sagarlakshmipathy commented 2 months ago

This link says you could register the table using name and metadata-location in the request, in which name would be passed down by the tableName from the config.yaml (I think?) and I'm not sure what should be the value for metadata-location because it gets created based on the current sync status (because the version numbers could change)

the-other-tim-brown commented 1 month ago

This link says you could register the table using name and metadata-location in the request, in which name would be passed down by the tableName from the config.yaml (I think?) and I'm not sure what should be the value for metadata-location because it gets created based on the current sync status (because the version numbers could change)

The metadata location is the location of the iceberg metadata (/metadata/ in most cases)

the-other-tim-brown commented 1 month ago

@sagarlakshmipathy the Polaris catalog does not fully support the Iceberg spec yet so you will have to wait until they allow these user provided arguments: https://github.com/apache/polaris/blob/main/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java#L2002

sagarlakshmipathy commented 1 month ago

@the-other-tim-brown thanks for the link, I was having a hard time finding where that error was coming from the other day !!

This doesn't seem to be xtable related, closing this