Open sid-habu opened 3 months ago
Apologies for posting this here as I am having trouble joining the Delta Slack workspace
Stack while accessing the Delta table in GCS
24/08/07 07:52:03 INFO Snapshot: [tableId=d79ce5e3-8704-458f-bd5e-7f8bd8a4df8b] Created snapshot Snapshot(path=gs://xxxxxxx/_delta_log, version=0, metadata=Metadata(4e41b027-0687-4eec-847a-7de42711db06,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"gender","type":"string","nullable":true,"metadata":{}},{"name":"date_of_birth","type":"string","nullable":true,"metadata":{}},{"name":"email","type":"string","nullable":true,"metadata":{}},{"name":"first_name","type":"string","nullable":true,"metadata":{}},{"name":"customer_id","type":"integer","nullable":true,"metadata":{}},{"name":"phone_number","type":"string","nullable":true,"metadata":{}},{"name":"last_name","type":"string","nullable":true,"metadata":{}},{"name":"zip_code","type":"integer","nullable":true,"metadata":{}},{"name":"state","type":"string","nullable":true,"metadata":{}},{"name":"signup_date","type":"string","nullable":true,"metadata":{}},{"name":"city","type":"string","nullable":true,"metadata":{}}]},List(),Map(),Some(1722293863838)), logSegment=LogSegment(gs://xxxxxx/_delta_log,0,WrappedArray(FileStatus{path=gs://xxxxxx/_delta_log/00000000000000000000.json; isDirectory=false; length=2505; replication=3; blocksize=67108864; modification_time=1722827977842; access_time=1722827977842; owner=spark; group=spark; permission=rwx------; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),org.apache.spark.sql.delta.EmptyCheckpointProvider$@7c47ae7d,1722827977842), checksumOpt=None)
24/08/07 07:52:03 INFO DeltaLog: Updated snapshot to Snapshot(path=gs://xxxxxxx/_delta_log, version=0, metadata=Metadata(4e41b027-0687-4eec-847a-7de42711db06,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"gender","type":"string","nullable":true,"metadata":{}},{"name":"date_of_birth","type":"string","nullable":true,"metadata":{}},{"name":"email","type":"string","nullable":true,"metadata":{}},{"name":"first_name","type":"string","nullable":true,"metadata":{}},{"name":"customer_id","type":"integer","nullable":true,"metadata":{}},{"name":"phone_number","type":"string","nullable":true,"metadata":{}},{"name":"last_name","type":"string","nullable":true,"metadata":{}},{"name":"zip_code","type":"integer","nullable":true,"metadata":{}},{"name":"state","type":"string","nullable":true,"metadata":{}},{"name":"signup_date","type":"string","nullable":true,"metadata":{}},{"name":"city","type":"string","nullable":true,"metadata":{}}]},List(),Map(),Some(1722293863838)), logSegment=LogSegment(gs://xxxxx/delta/_delta_log,0,WrappedArray(FileStatus{path=gs://xxxxxxx/_delta_log/00000000000000000000.json; isDirectory=false; length=2505; replication=3; blocksize=67108864; modification_time=1722827977842; access_time=1722827977842; owner=spark; group=spark; permission=rwx------; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),org.apache.spark.sql.delta.EmptyCheckpointProvider$@7c47ae7d,1722827977842), checksumOpt=None)
24/08/07 07:52:03 INFO FieldMapperImpl: Generated Schema from Data Connection Field Mapping: StructType(StructField(gender,StringType,true),StructField(date_of_birth,StringType,true),StructField(email,StringType,true),StructField(first_name,StringType,true),StructField(customer_id,IntegerType,true),StructField(phone_number,StringType,true),StructField(last_name,StringType,true),StructField(zip_code,IntegerType,true),StructField(state,StringType,true),StructField(signup_date,StringType,true),StructField(city,StringType,true))
24/08/07 07:52:03 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.3 KiB, free 7.0 GiB)
24/08/07 07:52:03 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 843.0 B, free 7.0 GiB)
24/08/07 07:52:03 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on tee-partner-data-d6178-1-36300-e093da912bd25b06-driver-svc.cleanroom.svc:7079 (size: 843.0 B, free: 7.0 GiB)
24/08/07 07:52:03 INFO SparkContext: Created broadcast 2 from broadcast at FieldMapperImpl.scala:47
24/08/07 07:52:03 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 10.3.2.71:46775 in memory (size: 40.4 KiB, free: 28.6 GiB)
24/08/07 07:52:03 INFO BlockManagerInfo: Removed broadcast_0_piece0 on tee-partner-data-d6178-1-36300-e093da912bd25b06-driver-svc.cleanroom.svc:7079 in memory (size: 40.4 KiB, free: 7.0 GiB)
24/08/07 07:52:03 INFO TEEDataHarmonizer$: filterLists: 0
24/08/07 07:52:03 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.3.2.71:46775 in memory (size: 17.1 KiB, free: 28.6 GiB)
24/08/07 07:52:03 INFO BlockManagerInfo: Removed broadcast_1_piece0 on tee-partner-data-d6178-1-36300-e093da912bd25b06-driver-svc.cleanroom.svc:7079 in memory (size: 17.1 KiB, free: 7.0 GiB)
24/08/07 07:52:04 INFO PrepareDeltaScan: DELTA: Filtering files for query
24/08/07 07:52:04 INFO Snapshot: [tableId=4e41b027-0687-4eec-847a-7de42711db06] DELTA: Compute snapshot for version: 0
24/08/07 07:52:04 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 233.4 KiB, free 7.0 GiB)
24/08/07 07:52:04 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 40.4 KiB, free 7.0 GiB)
24/08/07 07:52:04 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on tee-partner-data-d6178-1-36300-e093da912bd25b06-driver-svc.cleanroom.svc:7079 (size: 40.4 KiB, free: 7.0 GiB)
24/08/07 07:52:04 INFO SparkContext: Created broadcast 3 from $anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128
24/08/07 07:52:05 INFO DataSourceStrategy: Pruning directories with:
24/08/07 07:52:05 INFO FileSourceStrategy: Pushed Filters:
24/08/07 07:52:05 INFO FileSourceStrategy: Post-Scan Filters:
24/08/07 07:52:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/08/07 07:52:05 INFO CodeGenerator: Code generated in 143.618869 ms
24/08/07 07:52:05 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 233.7 KiB, free 7.0 GiB)
24/08/07 07:52:05 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 40.4 KiB, free 7.0 GiB)
24/08/07 07:52:05 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on tee-partner-data-d6178-1-36300-e093da912bd25b06-driver-svc.cleanroom.svc:7079 (size: 40.4 KiB, free: 7.0 GiB)
24/08/07 07:52:05 INFO SparkContext: Created broadcast 4 from $anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128
24/08/07 07:52:05 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
24/08/07 07:52:05 INFO DAGScheduler: Registering RDD 7 ($anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128) as input to shuffle 0
24/08/07 07:52:05 INFO DAGScheduler: Got map stage job 1 ($anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128) with 1 output partitions
24/08/07 07:52:05 INFO DAGScheduler: Final stage: ShuffleMapStage 1 ($anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128)
24/08/07 07:52:05 INFO DAGScheduler: Parents of final stage: List()
24/08/07 07:52:05 INFO DAGScheduler: Missing parents: List()
24/08/07 07:52:05 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[7] at $anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128), which has no missing parents
24/08/07 07:52:05 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 105.9 KiB, free 7.0 GiB)
24/08/07 07:52:05 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 32.7 KiB, free 7.0 GiB)
24/08/07 07:52:05 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on tee-partner-data-d6178-1-36300-e093da912bd25b06-driver-svc.cleanroom.svc:7079 (size: 32.7 KiB, free: 7.0 GiB)
24/08/07 07:52:05 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1585
24/08/07 07:52:05 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[7] at $anonfun$recordDeltaOperationInternal$1 at DatabricksLogging.scala:128) (first 15 tasks are for partitions Vector(0))
24/08/07 07:52:05 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
24/08/07 07:52:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (10.3.2.72, executor 8, partition 0, PROCESS_LOCAL, 8509 bytes)
24/08/07 07:52:05 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.3.2.72:46213 (size: 32.7 KiB, free: 28.6 GiB)
24/08/07 07:52:07 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.3.2.72:46213 (size: 40.4 KiB, free: 28.6 GiB)
24/08/07 07:52:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) (10.3.2.72 executor 8): java.io.IOException: Error accessing gs://xxxxxxx/_delta_log/00000000000000000000.json
Caused by: java.io.IOException: Error accessing gs://xxxx/delta/_delta_log/00000000000000000000.json
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:2231)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:2121)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:801)
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:345)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:112)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.lambda$open$2(GoogleHadoopFileSystemBase.java:607)
at com.google.cloud.hadoop.fs.gcs.GhfsStorageStatistics.trackDuration(GhfsStorageStatistics.java:102)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:592)
at org.apache.hadoop.fs.FileSystem.lambda$openFileWithOptions$0(FileSystem.java:4633)
at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
at org.apache.hadoop.fs.FileSystem.openFileWithOptions(FileSystem.java:4631)
at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4768)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:92)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:132)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.$anonfun$buildReader$2(JsonFileFormat.scala:126)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:160)
at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:217)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:279)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 Forbidden
GET https://storage.googleapis.com/storage/v1/b/xxxxxx/delta%2F_delta_log%2F00000000000000000000.json?fields=bucket,name,timeCreated,updated,generation,metageneration,size,contentType,contentEncoding,md5Hash,crc32c,metadata
Describe the problem
I am trying to read a Delta table from S3 using Spark on Kubernetes and running into permission issues while reading the transaction log.
Here is my code and conf in the Spark session. This only fails for delta format as the same conf successfully reads parquet files in the same S3 bucket. Similar access denied occurs if I try to read the same delta table from a GCS bucket.
Error
Environment information
Since this fails only for the delta format, and also since the reader can list the root folder and then get to the transaction log, my hunch is that the Delta data source reader reads the FS config from the Spark session differently while interacting with the transaction log?