apache / incubator-xtable

Apache XTable (incubating) is a cross-table converter for lakehouse table formats that facilitates interoperability across data processing systems and query engines.
https://xtable.apache.org/
Apache License 2.0
919 stars 147 forks source link

Fails to Read SnowFlake Iceberg after Interoperating as Hudi and Delta in Spark #503

Closed soumilshah1995 closed 3 months ago

soumilshah1995 commented 3 months ago

Search before asking

Please describe the bug 🐞

Issue Description I am encountering issues when attempting to read an Iceberg table created in Snowflake after interoperating it with Hudi and Delta. Below are the steps and configurations used:

Snowflake Configuration

Create External Volume:

CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume
STORAGE_LOCATIONS = (
   (
      NAME = 'my-s3-us-east-1',
      STORAGE_PROVIDER = 'S3',
      STORAGE_BASE_URL = 's3://soumilshah-dev-1995/',
      STORAGE_AWS_ROLE_ARN = 'XX',
      STORAGE_AWS_EXTERNAL_ID = 'iceberg_table_external_id'
   )
);

Create Iceberg Table:

CREATE OR REPLACE ICEBERG TABLE tempdb.public.iceberg_people (
    id STRING,
    name STRING,
    age STRING,
    city STRING,
    create_ts STRING
)
CATALOG='SNOWFLAKE'
EXTERNAL_VOLUME='iceberg_external_volume'
BASE_LOCATION='snowflake_tables/iceberg_people';

INSERT INTO tempdb.public.iceberg_people (id, name, age, city, create_ts)
VALUES
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00');

my_config.yaml

sourceFormat: ICEBERG
targetFormats:
  - DELTA
  - HUDI
datasets:
  -
    tableBasePath: s3://soumilshah-dev-1995/snowflake_tables/iceberg_people
    tableDataPath: s3://soumilshah-dev-1995/snowflake_tables/iceberg_people/data
    tableName: iceberg_people
    namespace: tempdb.public

image

catalog.yaml

catalogImpl: org.apache.iceberg.snowflake.SnowflakeCatalog
catalogName: onetable
catalogOptions:
  io-impl: org.apache.iceberg.aws.s3.S3FileIO
  warehouse: s3://soumilshah-dev-1995/warehouse
  uri: jdbc:snowflake://XX.snowflakecomputing.com
  jdbc.user: XX
  jdbc.password: X

Ran Sync CMD ["java", "-cp", "iceberg-spark-runtime-3.3_2.12-1.4.2.jar:xtable-utilities-0.1.0-SNAPSHOT-bundled.jar:snowflake-jdbc-3.13.28.jar:iceberg-aws-1.4.2.jar:bundle-2.23.9.jar", "org.apache.xtable.utilities.RunSync", "--datasetConfig", "my_config.yaml", "--icebergCatalogConfig", "catalog.yaml"]

image

Perfect

path = "s3://soumilshah-dev-1995/snowflake_tables/iceberg_people/data"
spark.read.format("delta").load(path).createOrReplaceTempView("delta_snapshot")
query = f"SELECT * FROM delta_snapshot"
spark.sql(query).show(truncate=False)

error

raceback (most recent call last):
  File "/home/glue_user/spark/python/pyspark/sql/dataframe.py", line 616, in show
    print(self._jdf.showString(n, int_truncate, vertical))
  File "/home/glue_user/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/home/glue_user/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/home/glue_user/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o76.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 102) (e2185d40ca59 executor driver): org.apache.spark.sql.execution.QueryExecutionException: Encountered error while reading file s3://soumilshah-dev-1995/snowflake_tables/iceberg_people/data/snow_EMDhpK7cnFM_AJCcEFkA6Bc_0_1_002.parquet. Details: 
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:713)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:402)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:702)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:968)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:138)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_LENGTH_BYTE_ARRAY
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.getValuesReader(VectorizedColumnReader.java:345)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.initDataReader(VectorizedColumnReader.java:309)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV2(VectorizedColumnReader.java:389)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$100(VectorizedColumnReader.java:49)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:281)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:268)
    at org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:192)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:268)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:186)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:331)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:227)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:393)
    ... 21 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:533)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:486)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3932)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3922)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3920)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:3125)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:290)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:329)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Encountered error while reading file s3://soumilshah-dev-1995/snowflake_tables/iceberg_people/data/snow_EMDhpK7cnFM_AJCcEFkA6Bc_0_1_002.parquet. Details: 
    at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:713)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:402)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:227)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:702)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:968)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:383)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$

https://medium.com/@sagarlakshmipathy/how-to-interoperate-with-snowflake-and-open-table-formats-e70f453787a2

tried

%%sql
CREATE EXTERNAL TABLE
  testdb.iceberg_people
  LOCATION  "s3://soumilshah-dev-1995/snowflake_tables/iceberg_people/"
  TBLPROPERTIES ('table_type' = 'DELTA');

Logs

An error was encountered:
Unable to infer the schema. The schema specification is required to create the table `testdb`.`iceberg_people`.
Traceback (most recent call last):
  File "/home/glue_user/spark/python/pyspark/sql/session.py", line 1034, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self)
  File "/home/glue_user/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/home/glue_user/spark/python/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Unable to infer the schema. The schema specification is required to create the table `testdb`.`iceberg_people`.

this method didn't work mentioned in blog post

im trying to read them as delta and hudi in spark they didn't appear to work

Are you willing to submit PR?

Code of Conduct

soumilshah1995 commented 3 months ago

Reregistering tables via Glue Crawlers works, allowing me to read them in Athena.

Via Crawlers (Works) image

Querying via Spark

Querying either . or via path fails.

image

Same goes for hudi tables

sagarlakshmipathy commented 3 months ago

Were you able to query the iceberg table from spark?

the-other-tim-brown commented 3 months ago

The error is from the reader, you need to make sure they support parquet v2 if you write with v2. See a similar spark issue that gets resolved in spark 3.3.X https://issues.apache.org/jira/browse/SPARK-36879

soumilshah1995 commented 3 months ago

yes I used glue 4.0 which uses spark 3.3 I assume its due to that
good to know