apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] Spark Structured Streaming To Hudi Sink Datasource taking much longer #1878

Closed rubenssoto closed 4 years ago

rubenssoto commented 4 years ago

Hi, how are you?

Im using EMR 5.30.1, spark 2.4.5, hudi 0.5.2 and my data is store in S3.

Since today Im trying to migrate some of our datasets in production to apache hudi, Im having problems with the first, could you help me please?

It is a small dataset, 26gb distributed by 89 parquet files. Im reading the data with structured streaming, reading 4 files per trigger, when I write the stream in a regular parquet, works, but if I use hudi doenst work.

This is my hudi options, I tryed with or without shuffle options, I need files more than 500mb with max 1000mb

hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'event_date', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.precombine.field': 'LineCreatedTimestamp', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.parquet.small.file.limit': 500000000, 'hoodie.parquet.max.file.size': 800000000, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.database': 'datalake_raw', 'hoodie.datasource.hive_sync.partition_fields': 'event_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.jdbcurl':'jdbc:hive2://:10000', 'hoodie.insert.shuffle.parallelism': 20, 'hoodie.upsert.shuffle.parallelism': 20 }

My read and write functions:

def read_parquet_stream(spark_session, read_folder_path, data_schema, max_files_per_trigger): spark = spark_session df = spark \ .readStream \ .option("maxFilesPerTrigger", max_files_per_trigger) \ .schema(data_schema) \ .parquet(read_folder_path) return df

def write_hudi_dataset_stream(spark_data_frame, checkpoint_location_folder, write_folder_path, hudi_options): df_write_query = spark_data_frame \ .writeStream \ .options(**hudi_options) \ .trigger(processingTime='20 seconds') \ .outputMode('append') \ .format('hudi')\ .option("checkpointLocation", checkpoint_location_folder) \ .start(write_folder_path) df_write_query.awaitTermination()

I caught some errors:

Job aborted due to stage failure: Task 11 in stage 2.0 failed 4 times, most recent failure: Lost task 11.3 in stage 2.0 (TID 53, ip-10-0-87-171.us-west-2.compute.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.3 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714. My cluster is small, but the data is small too master: 4 cores and 16gb ram nodes: 2 nodes with 4 cores and 16gb each

If I write stream in a regular parquet takes 38min to finish the job, but in hudi it have been passed more then one hour and half and job haven't finished yet.

Could you help me? I need to put this job in production as soon as possible.

Thank you Guys!!!

Captura de Tela 2020-07-24 às 17 26 45 Captura de Tela 2020-07-25 às 00 03 26 Captura de Tela 2020-07-25 às 00 05 00
rubenssoto commented 4 years ago

I tried resizing the cluster with 3 more nodes, so in total(4 nodes) after resizing I had 4 cores in each node and 16gb of ram each, and it wasn't any difference, the job keeps very slow and with memory errors.

bvaradar commented 4 years ago

This is a spark tuning issue in general. The slowness is due to memory pressure and node failures due to it. Atleast in one of the batches, I see task failures (and retries) during reading from source parquet file itself.

As mentioned in the suggestion "Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.", you need to increase spark.yarn.executor.memoryOverhead. You are running 2 executors per machine with 8GB room for each which may not have lot of room. If you are trying to compare parquet write with hudi, note that hudi adds metadata fields which gives incremental pull, indexing and other benefits. If your original record size is very small and comparable to metadata overhead and your setup is already close to hitting the limit for parquet write, then you would need to give more resources.

On a related note, since you are trying to use streaming for bootstrapping from a fixed source, have you considered using bulk insert or insert (for size handling) in batch mode which would sort and write the data once. With this mode of incremental inserting, Hudi would try to increase a small file generated in the previous batch. This means that it has to read the small file and apply new insert and write a newer version of the file (which is bigger). As you can see, more the number of iterations here, the more repeated reads will happen. Hence, you would benefit by throwing more resources for a potentially shorter time to do this migration.

rubenssoto commented 4 years ago

Hi bvaradar, thank you for your awnser.

I tried to increase spark.yarn.executor.memoryOverhead to 2GB with foreachbatch option inside writestream and it worked. 4 nodes with 4 cores and 32gb each, took 52 minutes is a good time for this hardware configuration? I think that could be better but Im very happy. Whats the difference with spark streaming with or without foreachbatch, Am I lost anything important? I tried, because I saw in delta lake docs, they use foreachbatch for merge in spark streaming.

Captura de Tela 2020-07-25 às 18 04 27 Captura de Tela 2020-07-25 às 18 04 53 Captura de Tela 2020-07-25 às 18 04 40

Some jobs took more time, do you know why some jobs created a lot of tasks? I think that could be more efficient if they write with fewer tasks. Now I will try do the same thing with write operation "upsert" because my data set could have some duplicated values and I don't know what files are they.

rubenssoto commented 4 years ago

Hi Again. 👍

When I changed the insert option to upsert the performance got worse. 1 Master Node m5.xlarge(4 vcpu, 16gb Ram) 1 Core Node r5.xlarge(4 vcpu, 32gb ram) 4 Task Nodes r5.xlarge(4 vcpu, 32 ram) spark.yarn.executor.memoryOverhead: 2048 Im reading 10 files on each trigger, at the beginning my file size is 1gb each

hudi options hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'event_date', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'LineCreatedTimestamp', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.parquet.small.file.limit': 500000000, 'hoodie.parquet.max.file.size': 900000000, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.database': 'datalake_raw', 'hoodie.datasource.hive_sync.partition_fields': 'event_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.jdbcurl':'jdbc:hive2://ip-10-0-53-190.us-west-2.compute.internal:10000' }

I totally understand what you said about hudi metadata and ordering operations, but I'm trying to process only 25gb of data and only on tasks nodes I have more than 100gb of ram, I am probably doing something wrong hehehe All process took 1 hour and 40 minutes.

Captura de Tela 2020-07-25 às 22 03 23 Captura de Tela 2020-07-25 às 22 04 03 Captura de Tela 2020-07-25 às 22 03 34

I tried the same operation in batch mode with insert operation it took 46 minutes, the overall performance it seems much better in batch mode like you could see in the follow image

Captura de Tela 2020-07-25 às 23 22 30

but this batch execution created a lot of 50Mb files, is there way to get better?


I think to process big workloads in batch mode with insert operation could be much more scalable, what do you think? My situation is, I have some datasets that I need to process all data, my data has to be deduplicated because is CDC data and after that I need to keep updating the data with streaming. These new datasets will be a source to create many others tables in the company.

Could you advise me wich could be the better solution? I think, that I could batch all data and after that keep running a streaming solution to keep the data updated.

Last question, when I run in insert mode on streaming job with foreachbatch, hudi will deduplicate only data that exist inside this specific batch? For example, I'm reading 10 files on each trigger, so, if in the next batch trigger has data that exists in the previous batch trigger, data wont be deduplicate, I'm right?

Thank you so much, and I'm sorry for a lot of query, but I need to use Hudi on production ASAP

bvaradar commented 4 years ago

With inserts, there is no index lookup which essentially means we are appending to a dataset but making sure we are writing to correct partition path and ensuring file sizing is honored.

With upserts, we also do index lookup to ensure record level merge happens fine. Its important to know the workload pattern before I can say what is happening. Can you please attach the output of "commits show --includeExtraMetadata". More related questions : Is the record key : "id" a UUID ? Is there a natural ordering to the keys ? If there is natural ordering, you will benefit by initially using bulk Insert (which sorts and writes the data) during the initial bootstrap followed by upserts.

Note that there is fundamental bottleneck with taking in updates for Copy-On-Write mode: If your input batch has updates present in a large number of files, then those files need to be rewritten. You can throw in more executors to write them all in parallel but parquet writing will be a lower bound.

Merge On Read mode performs way better in this case. For Structured streaming, you can try using Merge-On-Read mode (Need to have this PR patch : https://github.com/apache/hudi/pull/1752 to automatically manage compactions). Here, Hudi does not write Parquet file for every single batch but just writes them to delta-log files. Note that if you are planning to use DeltaStreamer, Merge-On-Read with async compaction is already supported in 0.5.x releases which will be ideal for your CDC use-case.

rubenssoto commented 4 years ago

Hi bvaradar, how are you? I hope doing fine!

I have a new case, which is a little more important to me, the problem is almost the same. I adopted the strategy to first batch all data in an insert operation and after that, get the latest data with structured streaming.

Answer your question, all my tables have PK with integers id and normally they are auto-increment. Does Hudi already order data in an insert operation by pk? Because in my first batch I am sorting the data by date, is it necessary?

I think I have the CoW problem that you said. I have an order table with my clients orders, every minute new orders arrive, and my clients could give a grade to the order at any point in time, for example in a streaming batch could have a client order grade for an order that was made in the last month.

This table, today, is very small, in hudi dataset, are 15 files of 500mb each, I didn't partition the table because a daily partition is small and partition by month I think don't make sense. My streaming is running right now, but Hudi rewrites all 15 files every streaming batch, my data is small, so its fine, but I think it is not efficient and when data the grows it could become a problem.

I will use aws Athena to query all my tables and this specific order table may be delayed up to 15 minutes. I saw that Athena only query Read Optmized MoR, how MoR could help me in this case?

The last question, in an insert operation, how can I control the file size?

Thank you for your time!

Some images of my streaming:

Captura de Tela 2020-07-29 às 02 04 06 Captura de Tela 2020-07-29 às 02 03 54 Captura de Tela 2020-07-29 às 02 03 33 Captura de Tela 2020-07-29 às 02 01 51

Hudi Options In batch Insert

hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': primary_key_column, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': write_operation, 'hoodie.combine.before.insert': 'true', 'hoodie.combine.before.upsert': 'false', 'hoodie.datasource.write.precombine.field': precombined_column, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.parquet.small.file.limit': 700000000, 'hoodie.parquet.max.file.size': 900000000, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.database': 'datalake_raw', 'hoodie.datasource.hive_sync.jdbcurl':'jdbc:hive2://ip-10-0-88-165.us-west-2.compute.internal:10000', 'hoodie.upsert.shuffle.parallelism': 60, 'hoodie.insert.shuffle.parallelism': 60 }

Hudi Options In UPSERT

hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': primary_key_column, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': write_operation, 'hoodie.combine.before.insert': 'false', 'hoodie.combine.before.upsert': 'true', 'hoodie.datasource.write.precombine.field': precombined_column, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.parquet.small.file.limit': 700000000, 'hoodie.parquet.max.file.size': 900000000, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.database': 'datalake_raw', 'hoodie.datasource.hive_sync.jdbcurl':'jdbc:hive2://ip-10-0-88-165.us-west-2.compute.internal:10000' }

Is there any options wrong?

bvaradar commented 4 years ago

For a monotonically increasing id, you can use bulk-insert instead of insert for first time loading of files, this would nicely order records by the id and your range-pruning during index lookup would be efficient. The parallelism configuration https://hudi.apache.org/docs/configurations.html#withBulkInsertParallelism controls the number of file getting generated.

I will use aws Athena to query all my tables and this specific order table may be delayed up to 15 minutes. I saw that Athena only query Read Optmized MoR, how MoR could help me in this case? ===>. Would let @umehrot2 answer this question. But If your use-case allows, you can schedule compaction for the MOR table at a frequency to align with a SLA that you want to maintain. This way you can still query those data using RO.

For the insert operations, the same config (as in upsert) controls file sizing ('hoodie.parquet.max.file.size')

rubenssoto commented 4 years ago

Bulk-insert do some deduplication?

bvaradar commented 4 years ago

Set hoodie.combine.before.insert=true for deduping during bulk insert

rubenssoto commented 4 years ago

Thank you so much @bvaradar for your help