apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

Running out of disk space for large input dataset #577

Closed pshivkumar closed 5 years ago

pshivkumar commented 5 years ago

I am using 10 node r3.8xlarge (640GB SSD and 244G memory) cluster. I was able to load data files that are less than 1 GB under 15 minutes without any issues. But I am running into issues when I try to load 64GB dataset (This data is in hdfs parquet table) into hudi table (tableType COPY_ON_WRITE) on hdfs. I have tried loading this data to hudi by splitting it into 50 chunks using randomSplit and also as one single dataset. So my spark job fails with exception provided below and disk space is reclaimed after that. Metrics show that capacity remaining on disk comes down to gradually (over 28 hours) to 100G from almost 6TB and thats when the job fails.

I am not providing any partition columns and using custom key generator (hoodie key is concatenation of two column values appended with ~) as this table has composite keys. This dataset has 207 columns.

I am setting HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED_PROP to 1, KEEP_LATEST_FILE_VERSIONS as cleaning policy, hoodie.insert.shuffle.parallelism and hoodie.update.shuffle.parallelism to 1000.

Spark arguments: -executor-memory 2G --driver-memory 4G --executor-cores 1 --conf fs.hdfs.impl.disable.cache=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

Below is the exception:

Caused by: java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220) at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:252) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:209) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:416) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:230) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Appreciate any pointers. Let me know if you need additional data. I am retrying the same job now by dividing input data set into 1000 chunks but that has been running for too long now.

n3nash commented 5 years ago

@pshivkumar How many executors are you using ? Essentially --num-executors is a spark parameter using which you can supply the number of executors. From the stacktrace, I can say the following is happening : Spark has something called a shuffle service that helps shuffle the data and write them to the local file system to be accessed by spark. If you are using a really low number of executors, this forces all the data to be shuffled to those small number of nodes leading to out of disk issue on those nodes. If you num executors is lower than 10 for this job, try bumping it up.

pshivkumar commented 5 years ago

@n3nash Thanks for quick response. I haven't set that explicitly. I will run the job setting --num-executors to 20 and update once that completes.

vinothchandar commented 5 years ago

If you can give us a screeshot of the spark UI, it will be super helpful..

My guess is, your /tmp is pretty small.. you can try placing spark shuffle data in one of your larger disk partitions using --conf spark.local.dir=/path/to/large/partition ?

pshivkumar commented 5 years ago

I have increased number of executors to 20. Job has been running for over 3 days and capacity remaining on the cluster has come down to around 2.5 TB from 5TB. So it looks like it is not because of size limitation of tmp directory as job is usually failing when overall cluster utilization is close to 100%. In my EMR cluster, /tmp is pointing to /mnt/tmp and it has about 300G allocated in each of the node. I will anyways try setting that local directory in the next run and also provide spark UI screenshot (I can access Yarn but not spark UI because of some authentication issue). I have attached spark application stage from emr console Google drive link

Could you provide insights into how large datasets that you process at Uber and what cluster you use for that? This will help me understand if I am under provisioning or not.

Below are more details about the job and utilization:

spark-submit arguments MainClass: CompactionTool

Jar Path: s3://xyz/jars/hudi-code.jar s3://xyz/jars/hudi-compaction-split.jar

--executor-memory 2G --driver-memory 4G --executor-cores 1 --num-executors 20 --conf fs.hdfs.impl.disable.cache=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --jars s3://xyz/jars/hoodie-spark-bundle-0.4.5-SNAPSHOT.jar --packages com.databricks:spark-avro_2.11:4.0.0

HDF usage:

[hadoop@ip-xxx-116 ~]$ hdfs dfs -du -s -h '/*' 15.7 M /apps 1.6 G /hudi-data -- this is location of hudi table 371.9 M /tmp 47.7 G /user 32.2 G /var [hadoop@ip-172-31-26-116 ~]$ date Thu Feb 14 17:41:12 UTC 2019

Disk usage metric from two of the nodes to give an idea of usage in nodes:

Slave node 1:

[hadoop@ip-xxx-179 ~]$ df -h Filesystem Size Used Avail Use% Mounted on devtmpfs 121G 92K 121G 1% /dev tmpfs 121G 0 121G 0% /dev/shm /dev/xvda1 9.8G 3.3G 6.4G 34% / /dev/mapper/xvdb1 5.0G 125M 4.9G 3% /emr /dev/mapper/xvdb2 295G 161G 135G 55% /mnt /dev/mapper/xvdc 300G 164G 137G 55% /mnt1

Slave node 2:

[hadoop@ip-xx-78 ~]$ df -h Filesystem Size Used Avail Use% Mounted on devtmpfs 121G 92K 121G 1% /dev tmpfs 121G 0 121G 0% /dev/shm /dev/xvda1 9.8G 3.3G 6.4G 34% / /dev/mapper/xvdb1 5.0G 126M 4.9G 3% /emr /dev/mapper/xvdb2 295G 150G 146G 51% /mnt /dev/mapper/xvdc 300G 154G 147G 52% /mnt1

n3nash commented 5 years ago

Jobs running over 3 days is uncommon for Hudi jobs unless you are doing some sort of bulkInsert for few 100 terabytes of data. I think there's something we are missing here.

  1. What's the compactionTool that you speak of as the main class ? Is this just a Hudi spark job ?
  2. What's your input batch size ?
  3. I see that the location of the hudi table shows 1.6 G, were you already able to ingest some data into a hudi table ?
  4. Can you share you Hudi configs along with the spark configs here please, as @vinothchandar mentioned, a spark UI snapshot would help immensely.

At Uber, we are able to ingest 100's of gigabytes of data within 30-60 mins into hudi datasets. Our spark jobs vary in number of executors depending on the size of the input batch.

pshivkumar commented 5 years ago

Thanks for prompt responses. I created new cluster and submitted the job again. It has been running since 2 hours. I now have access to sparkUI.

  1. Yes, it is Hudi spark job. I am splitting input dataset into 40 parts and lading that to Hudi table. Code link

  2. Input dataset is hdfs parquet dataset. Its size is 46G. I was not able to load it at once because of same disk space issue. So input is divided into 50 parts. Code link provided above.

[hadoop@ip-xxx-181 ~]$ hdfs dfs -du -s -h '/*' 15.7 M /apps 596.1 M /hudi-data 773.4 M /tmp 47.7 G /user 1.8 G /var

  1. Yes. As I am loading data in multiple parts initial parts of data has got loaded. Code link provided above has Hudi configs that I use

  2. Link to spark properties. Below is screenshot of job that is running now. I will upload another one after job runs for few more hours to give a better picture of the current run. screen shot 2019-02-20 at 2 07 28 pm

n3nash commented 5 years ago

@pshivkumar From an initial look, what you are doing sounds reasonable. Although, ingesting 50 GB of data into Hudi should take no less than 15-30 mins in an average case. We will need a little more information to help you debug the problem :

  1. Can you take a screenshot of the stages tab in your Spark UI. Essentially, I'm interested to see what's the shuffle in your stages.
  2. Can you tell me how many executors you used to run this ? If you look at the spark properties you linked, the spark.executor.instances field doesn't have a value. If you are relying on YARN, the default is 1 executor. You can also see the number of executors by clicking on the executors tab in the Spark UI.
  3. Can you please share the hoodie configs ?

On a side note, if you are just trying to import your dataset into hudi format, look at this : https://hudi.apache.org/migration_guide.html#approach-2

pshivkumar commented 5 years ago

@n3nash Below is the stage info after job ran about 22 hours. I looked at parquet importer approach. Can I use this for datasets that have composite keys (I am using custom key generator in my current hudi job)?

  1. Attached screenshot of stages tab. screen shot 2019-02-21 at 10 34 09 am
  2. I ran with 20 executors. Excel file I gave has that value but that is hiding on the right end of cell. Below is screenshot of executor tab. screen shot 2019-02-21 at 10 26 21 am
  3. Hoodie Configs: val writer = data.write.format("com.uber.hoodie") .option("hoodie.insert.shuffle.parallelism",parallelism) --set to 1000 .option("hoodie.upsert.shuffle.parallelism",parallelism) --set to 1000 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, primaryKey) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, pkOrderingCol) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, enableHiveSync) .option(HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED_PROP, 1) .option(HoodieCompactionConfig.CLEANER_POLICY_PROP,"KEEP_LATEST_FILE_VERSIONS") .mode(insertMode) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "com.uber.hoodie.hive.NonPartitionedExtractor") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "horizonHudiCompactor.HorizonKeyGenerator")
n3nash commented 5 years ago

@pshivkumar Yes, the HDFSParquetImporter tool only takes a single value _row_key, but you can easily change the tool to support composite keys, happy to take a PR from you :)

Additionally, I see that the input size for each of your batch is around 225 GB which is larger than what we thought earlier (45GB) for stage 419 whereas for stage 463 it correctly shows 46.6 GB, can you tell me what's the different between these batches ? (look at the mapToPair stage to show the actual input size). 20 executors may not be enough to finish your job quickly, you may want to bump that up to around 100.

Also, in the executors tab, for some reason I see Active 531 executors whereas only 20 entries are there. Not sure what's going on there.

I also see there's a lot of shuffle due to BloomIndex. @vinothchandar Is there a way to avoid these shuffles ?

n3nash commented 5 years ago

@pshivkumar Checking again to see if you were able to make progress

pshivkumar commented 5 years ago

@n3nash - Same dataset is split and then each chunk of that data is passed onto those stages. For some reason that map stage size keeps ballooning over the time. My colleague is trying with different split approach and using properties suggested in issue https://github.com/uber/hudi/issues/246

I tried using HDFSParquetImporter for another dataset which does not have composite keys to see if it works for my datasets. I am having hard time to make avro schema work for timestamp (yyyy-mm-dd hh:mm:ss) data. I tried using string, bingint, long and int. I get this exception java.lang.IllegalArgumentException (INT96 not yet implemented.) This could be because of parquet version.

19/02/27 20:18:08 WARN TaskSetManager: Lost task 204.0 in stage 0.0 (TID 34, ip-172-31-18-237.ec2.internal, executor 2): java.lang.IllegalArgumentException: INT96 not yet implemented. at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:251) at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:236) at org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName$7.convert(PrimitiveType.java:223) at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:235) at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:215) at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:209) at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:124) at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175) at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:190) at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:189) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:186) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:141) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Is it possible to make this work with hive table schema without having to create avro schema file?

vinothchandar commented 5 years ago

@pshivkumar may be we can do a hangout sometime? and try to understand whats going on? Might be a much easier route to understand..

Is it possible to make this work with hive table schema without having to create avro schema file? Hmmm. we need a schema to register the Hive table and that's derived from the parquet files.. What parquet version are you at?

vinothchandar commented 5 years ago

Below is the stage info after job ran about 22 hours

I cant understand how the job ran for 22 hrs based on the UI.. Individual stages are in few minutes right? Are we missing something? Also the number of executors you have is 500 or so. 20 is what the UI shows at one time.. If you notice below "Executors" line in entry there is a drop down which forces to only list 20 executors..

pshivkumar commented 5 years ago

@pshivkumar may be we can do a hangout sometime? and try to understand whats going on? Might be a much easier route to understand..

Is it possible to make this work with hive table schema without having to create avro schema file? Hmmm. we need a schema to register the Hive table and that's derived from the parquet files.. What parquet version are you at?

@vinothchandar.. Yes, Hangout will help. I am in PDT time zone. Any Tuesday/Thursday work for me. Once you confirm, I will run the job again and keep cluster running for our meeting.

vinothchandar commented 5 years ago

@pshivkumar have you joined our slack channel? https://hudi.apache.org/community.html if you can hop on there, we can coordinate a time there.. next tue should be doable

bvaradar commented 5 years ago

Closing due to inactivity