Closed Rap70r closed 3 years ago
Hudi follows MVCC and hence there is isolation between writers and readers. You should not see any such issues.
Hello nsivabalan, thank you for getting back to me.
All Hudi tables are stored in S3 buckets. We use Spark Structured Streaming to apply incremental updates against S3 Hudi datasets.
Stacktrace
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to download file path: s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet, range: 0-515243, partition values: [empty row], isDataPresent: false
at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory 's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet'
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694)
at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492)
at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93)
at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73)
at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Output Path: S3 path
hoodie.datasource.write.operation: upsert
parallelism: 3000
hoodie.datasource.write.table.type: COPY_ON_WRITE
hoodie.cleaner.policy: KEEP_LATEST_FILE_VERSIONS
File Version Retained: 1
hoodie.datasource.hive_sync.enable: false
SaveMode: Append
partitionBy: Single Column
val df = ss.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("s3://path/to/hudi/table/*/*")
df.createOrReplaceTempView("hudi_table")
To Reproduce
val df = ss.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("s3://path/to/hudi/table/*/*")
df.createOrReplaceTempView("hudi_table")
* Apply time consuming Spark SQL queries against 'hudi_table'
* A different Spark process updates Hudi dataset incrementally.
* After upsert is done, if the time consuming query is still running, it will crash with below error:
org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to download file path: s3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet, range: 0-515243, partition values: [empty row], isDataPresent: false at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:252) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:174) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:132) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: No such file or directory 's3://bucket_name/folder_name/table_name/some_partition/some_parquet_file.parquet' at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:473) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:694) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:449) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildPrefetcherWithPartitionValues$1(ParquetFileFormat.scala:492) at org.apache.spark.sql.execution.datasources.AsyncFileDownloader.org$apache$spark$sql$execution$datasources$AsyncFileDownloader$$downloadFile(AsyncFileDownloader.scala:93) at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:73) at org.apache.spark.sql.execution.datasources.AsyncFileDownloader$$anon$1.call(AsyncFileDownloader.scala:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Let me know if you need further details.
Thank you
@bvaradar @n3nash : let me take a stab. let me know if my understanding is right. customers sets file versions retained to 1. So, if there are two writes by the time a single lengthy query completes, we could encounter this situation where in the query could throw FileNotFoundException as 2nd write would have deleted the 1st file version for all data files?
@Rap70r : in the mean time, do you want to set file versions retained to be 1 necessarily? if not, can you try setting it to 3 and let us know if you could still reproduce the issue.
Hi nsivabalan,
I could try setting retention version to a higher number but the thing is we do apply incremental updates very frequently in short intervals. Like every several minutes. A reading process could be running for more than an hour. Enough time for Hudi to update several times. So that might not solve the issue entirely for long running queries. Is there a different approach we could look into? Like any caching mechanism?
Thank you
Few options/questions:
Hi nsivabalan,
Thank you for your reply.
I was wondering if we should look into table caching in Spark: https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-aux-cache-cache-table.html
As this would cache the entire table into disk/memory and would work with that. The only downside I can think of is space issues. Are there any other disadvantages when using cache and persist?
Also, we're looking into improving reader's speed with combination of increasing retention version value. When reading a S3 Hudi dataset structure, does the number of partition affect the speed of readers? For example, if the table is partitioned into 200 folders or 1000 folders, by choosing different columns, would that affect the speed when reading the table by using Snapshot query: https://hudi.apache.org/docs/querying_data.html#spark-snap-query
Thank you
So that might not solve the issue entirely for long running queries. Is there a different approach we could look into? Like any caching mechanism?
I wonder if this issue can be mitigated in your code by simply issuing a df.cache()
? That way the recomputation of the dataframe is not triggered, even if the cleaning policy on the writer side, deletes some older files? I am fairly confident that it might work, but of course, comes at the cost of additional/memory and storage.
we're looking into improving reader's speed with combination of increasing retention version value.
the metadata table we added in 0.7.0, should help alleviate concerns around listing larger partitions. Although, we have added support for Hive/SparkSQL-on-Hive only for now. We are working on support for Spark datasource.
if the table is partitioned into 200 folders or 1000 folders, by choosing different columns,
In general, the more folders, the smaller each file. So there will be some degradation (hudi or not). w.r.t partitions,I think it boils down to how S3 rate limits per prefix, more prefixes may actually help increase parallelism.
In all, you want to do fast incremental updates with long retention like few hours at-least (so long running jobs can finish), but your problem is query perf degrades if you say have cleaner retention for last 10 hours?
Hi @vinothchandar,
Thank you for your detailed answer. Yes, we are going to increase the retention policy to a higher number, like 15 or maybe higher, and also will work into improving performance of readers. We wouldn't want to have a retention period that exceeds few hours for the exact reason you mentioned.
I did try to increase the number of partitions to few thousands but after a certain point the performance drops due to the time it takes to iterate over all the files on our cluster's setup.
I want to clarify that we are not using Hive in our setup. Hudi tables are all written to S3 directly by Spark.
Thank you
Sounds good. Please keep us posted.
Will do, Thank you :)
once you respond and have any questions/clarifications, can you please remove "awaiting-user-response" label for the issue. If possible add "awaiting-community-help" label.
Closing this for now. please feel free to reopen or open a new ticket.
Hello,
We have a setup where we process data incrementally against large Hudi tables in S3, using Hudi and Spark. When reading large tables from a different spark process or when applying time consuming queries against spark dataframes, the reading process crashes if another process attempts to update that table incrementally. I assume due to underlying parquet partitions being modified while the dataframe still being queried. How can we isolate the table when reading and performing queries against that dataframe in Spark without being affected by the writers?
val ss = SparkSession.builder().getOrCreate()
val df = ss.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) .load("s3://path/to/hudi/table/*")
df.createOrReplaceTempView("hudi_table")