apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

Processing time very Slow Updating records into Hudi Dataset(MOR) using AWS Glue #4873

Open cafelo-pfdrive opened 2 years ago

cafelo-pfdrive commented 2 years ago

Describe the problem you faced

Long time time executing Upserts in HUDI. it takes 4 or 5 times longer doing Updates than Inserts. 90% data needs to be updated

Code below takes around 45 minutes to write new data (300 million records) in AWS S3 Bucket HUDI format with 21 GPU using AWS Glue, but it takes more than 3 hours ingesting the same data set previously inserted to update and remove duplicates as previously data could be resent multiple times to correct the quality of the data and consumers only need the latest version of the record Key

Additional context

in Apache spark UI, Stage Building workload profilemapToPair at SparkHoodieBloomIndex.java:266_ which takes the longest in the execution plan shuffles the following

Shuffle Read Size / Records: 42.6 GiB / 540 000 000 Shuffle Write Size / Records: 1237.8 GiB / 23 759 659 000 Spill (Memory): 7.7 TiB Spill (Disk): 1241.6 GiB

Expected behavior

We have a window of 1 hour to execute the ETL process which include both inserts and updates. It means if only inserting takes 45 minutes, the updates should not take longer than 1 hour processing 300 million records

To Reproduce

Steps to reproduce the behavior:

Environment Description

*AWS Glue basic properties glue version : Glue 2.0 - Supports spark 2.4, Scala 2, Python Worker Type G.1x Number of workers : 31 (1 driver and 30 executors) ==> 160 cores

Code

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import year, month, date_format, to_date, col from awsglue.dynamicframe import DynamicFrame from pyspark.sql.session import SparkSession

args = getResolvedOptions(sys.argv, ['JOB_NAME']) spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate() sc = spark.sparkContext glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) targetPath = 's3://poc-lake-silver/interval_mor/data/'

commonConfig = { 'className' : 'org.apache.hudi', "path": "s3://poc-lake-silver/interval_mor/data/", 'hoodie.bulkinsert.shuffle.parallelism': 320, 'hoodie.upsert.shuffle.parallelism': 320, 'hoodie.datasource.write.operation': 'upsert' }

partitionDataConfig = { 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'}

dataSourceWriteConfig = { 'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.precombine.field': 'ingestionutc', 'hoodie.datasource.write.partitionpath.field': 'intervaldate,plantuid', 'hoodie.datasource.write.recordkey.field': 'intervalutc,asset,attribute' } dataSourceHiveConfig = { 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': 'db_swat_lake_silver', 'hoodie.datasource.hive_sync.table': 'interval_mor', 'hoodie.datasource.hive_sync.partition_fields': 'intervaldate,plantuid' }

dataTableConfig = { 'hoodie.table.type': 'MERGE_ON_READ', 'hoodie.index.type': 'BLOOM', 'hoodie.table.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.database.name': 'db_swat_lake_silver', 'hoodie.table.name': 'interval_mor', 'hoodie.table.precombine.field': 'ingestionutc', 'hoodie.table.partition.fields': 'intervaldate,plantuid', 'hoodie.table.recordkey.fields': 'intervalutc,asset,attribute' }

finalConf = {commonConfig, partitionDataConfig, dataSourceWriteConfig, dataSourceHiveConfig, **dataTableConfig} S3bucket_node1 = glueContext.create_dynamic_frame.from_options( format_options={}, connection_type="s3", format="parquet", connection_options={ "paths": [ 's3://poc-lake-bronze/dc/interval/data/ingest_date=20220221/ingest_hour=04/', ], "recurse":True }, transformation_ctx="S3bucket_node1", )

ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("plant", "int", "plant", "int"), ("plantuid", "string", "plantuid", "string"),
("asset", "int", "asset", "int"), ("attribute", "string", "attribute", "string"), ("value", "double", "value", "double"), ("intervalutc", "timestamp", "intervalutc", "timestamp"), ("intervaldate", "string", "intervaldate", "string"), ("ingestiontsutc", "timestamp", "ingestionutc", "timestamp"), ], transformation_ctx="ApplyMapping_node2", )

S3bucket_node3 = ApplyMapping_node2.toDF() S3bucket_node3.write.format('org.apache.hudi').options(**finalConf).mode('Append').save(targetPath)

job.commit()

Stacktrace

Spark Environment image

Executors Summary image

Stages image

countByKey at SparkHoodieBloomIndex.java:114 image

Building workload profilemapToPair at SparkHoodieBloomIndex.java:266

image

nsivabalan commented 2 years ago

Since you are having a complex record key, I feel the range pruning w/ bloom is not effective. Bloom filters will be effective only if your record keys have some timestamp characteristics and so we can trim few file groups with just min and max values of record keys stored in them.

So, I would recommend you to try out "SIMPLE" index instead. for random or large updates, this might work out better. Do give this blog a read to understand index types in hudi. Also, you can check out the configs for simple index here.

cafelo-pfdrive commented 2 years ago

Could you elaborate what " timestamp characteristics" means for the record keys in Bloom?

Gatsby-Lee commented 2 years ago

@cafelo-pfdrive it is sth that increase incrementally.

Gatsby-Lee commented 2 years ago

@nsivabalan I have a question. In the reported config, there are three fields. Do all three fields have to be "timestamp characteristics"?

cafelo-pfdrive commented 2 years ago

Hello @Gatsby-Lee, the execution increase incrementally the more data written the updates gets worse time execution the dataytpes are like following 'hoodie.datasource.write.partitionpath.field': 'intervaldate,plantuid', intervaldate: String plantuid : String 'hoodie.datasource.write.recordkey.field': 'intervalutc,asset,attribute intervalutc : TimeStamp asset : integer attribute : string

cafelo-pfdrive commented 2 years ago

@nsivabalan, we trying with SIMPLE index but the time execution are very similar, could you elaborate more about " timestamp characteristics" means for the record keys in Bloom? what is the best practices to define the datatypes for the fields in the record key fields. Thanks in advance

Gatsby-Lee commented 2 years ago

@cafelo-pfdrive I saw you use MERGE_ON_READ in AWS Glue ( I use Glue as well ). How do you run Table Compaction? What value do you use for these config?

cafelo-pfdrive commented 2 years ago

@Gatsby-Lee. We dont have any compaction process running aside of the default process so I am not using the parameters that you are asking for

Gatsby-Lee commented 2 years ago

@cafelo-pfdrive oh. you need the Compaction if you want to use MoR. In Glue, ( if I am right ), Table compaction doesn't happen automatically for you. ( you might have to setup a new Glue Job that does the Compaction if you want to run it Async ) if the Compaction doesn't happen, then you will have lots of small files in the data directory. And it will keep slow down the overall Hudi write.

I think hoodie.compact.inline is false by default. hoodie.datasource.compaction.async.enable is true by default.

if you don't want to bring up a extra Glue Job for the compaction. you can set true for hoodie.compact.inline, then it will run compaction service for every write. If so, then I guess you might not get the benefit of using MoR.

I hope this helps you.

nsivabalan commented 2 years ago

yes, do set hoodie.compact.inline to true and get the compaction moving. and later you can think about moving to async flow.

btw, wrt partitioning strategy, it depends on your query patterns. If your queries mostly have predicates around dates, its wise choice to partition with dates.

Even your ingestion might speed up depending on your partitioning strategy.

For eg, if your dataset is date partitioned, and if your incoming records have data only for 5 partitions, index lookup happens only among the 5 partitions. but if your partitioning is complex and is based of of both date and some other field X, num of partitions touched might be more.

Also, do check out the cardinality of your partitioning scheme. Ensure you do not have very high no of partitions (small sized), nor too less num of partitions (large sized). try to find some middle ground.

then you also asked about timestamp characteristics of record keys right. Let me try to illustrate w/ an example. lets say, your record keys are literally timestamp field.

During commit1, you ingest records with min and max value as t1 to t100. and this goes into data file1 with commit2, you ingest records with min and max as t101 to t201. and this goes into data file2. and ..... commit10, say you ingest with min and max as t900 to t1000... data file 10.

now, lets say you have some updates. for record with key t55 to t65 and t305 to t310. since each file is nicely laid out, using just the min max values, all files except file1 and file3 will be filtered out in first step in index. And then bloom filter look up happens and then actual files will be looked up to find the right location for the keys.

Alternatively, lets say your record keys are random. commit1 -> data file1: min and max values are t5 and t500 commit2 -> data file2: min and max values are t50 and t3000 . . commit10 -> data file10: min and max values are t70 and t2500.

for when we get some updates, pretty much all files will be considered for 2nd step in index. in other words, min max based pruning will not be effective and it just adds to your latency.

Hope this clarifies what I mean by timestamp characteristics in your record keys.

nsivabalan commented 2 years ago

Can you list your .hoodie folder in your hudi table base path. Wanna check if compaction happened or never kicked in only.

cafelo-pfdrive commented 2 years ago

@Gatsby-Lee, @nsivabalan. Thanks much for the explanation, base in your recommendation I changed the Index to 'hoodie.index.type': 'SIMPLE' and add a new column in the partition path seen performance really improve comparing with 'BLOOM'

I also added the following 'hoodie.compact.inline': 'true', and then 'hoodie.datasource.compaction.async.enable': 'false'

but still I don't see the files get compacted

image

thanks again for support

cafelo-pfdrive commented 2 years ago

@nsivabalan, here it goes the .hoodie folder Please let me know if this is the information that you are asking for image image

Gatsby-Lee commented 2 years ago

@cafelo-pfdrive I think the compaction might happen based on the compaction strategy. ( default is num_commit )

so, I think the compaction is triggered every 10 delta commits

cafelo-pfdrive commented 2 years ago

@Gatsby-Lee really appreciate your answer. here it goes the latest .hoddie folder. I see new "clean" file. What type of file should create it for the compaction?

image

Gatsby-Lee commented 2 years ago

@cafelo-pfdrive Unfortunately, It seems I didn't document what I investigated on Compaction. I guess when Compaction runs, you might be able to find xxx.compaction.request in the data directory's .hoodie/.aux somewhere.

I saw that there are several xxx.compaction.request in the directory and handled and removed.

You can try with inline compaction / check the log with search "SparkRunCompactionActionExecutor"

I hope this helps

cafelo-pfdrive commented 2 years ago

Thanks @Gatsby-Lee , I noticed that step Building workload profile countByKey at BaseSparkCommitActionExecutor.java:175 only use the GPU x cores. It is a way to parallelize more. It looks like this step is the bottleneck

Thanks much

Gatsby-Lee commented 2 years ago

@cafelo-pfdrive Thank you. Have you figured out or confirmed if the Async Table Compaction runs in AWS Glue Streaming? I haven't confirmed it yet, but based on what I saw before, it seems Async Table Compaction didn't run.

pmgod8922 commented 2 years ago

@cafelo-pfdrive Thank you. Have you figured out or confirmed if the Async Table Compaction runs in AWS Glue Streaming? I haven't confirmed it yet, but based on what I saw before, it seems Async Table Compaction didn't run.

@Gatsby-Lee Can you provide the Hudi(MOR) configuration? We’re refer Looking forward to your reply. Thank You!!!!!!

cafelo-pfdrive commented 2 years ago

Hello @Gatsby-Lee Here it goes the Hudi (MOR) configuration

commonConfig = { 'className' : 'org.apache.hudi', "path": "s3://poc-lake-silver/interval_mor_hour/data/", 'hoodie.upsert.shuffle.parallelism': 160, 'hoodie.simple.index.parallelism' : 320, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.table.type': 'MERGE_ON_READ', 'hoodie.index.type': 'SIMPLE' }

partitionDataConfig = { 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'}

dataSourceWriteConfig = { 'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.precombine.field': 'ingestionutc', 'hoodie.datasource.write.partitionpath.field': 'plantuid,intervaldate,intervalhour', 'hoodie.datasource.write.recordkey.field': 'intervalutc,asset,attribute' } dataSourceHiveConfig = { 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': 'db_swat_lake_silver', 'hoodie.datasource.hive_sync.table': 'interval_mor_hour', 'hoodie.datasource.hive_sync.partition_fields': 'plantuid,intervaldate,intervalhour' }

dataTableConfig = { 'hoodie.table.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.database.name': 'db_swat_lake_silver', 'hoodie.table.name': 'interval_mor_hour', }

Please see also the Spark Session configuration spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \ .config('spark.driver.memory','10') \ .config('spark.executor.memory', '8g') \ .config('spark.executor.cores', '4') \ .config('spark.executor.instances', '160') \ .config('spark.driver.memoryOverhead','1024') \ .config('spark.executor.memoryOverhead','1024') \ .config('spark.default.parallelism','160') \ .config('spark.sql.shuffle.partitions','320') \ .config('spark.memory.storageFraction','0.2') \ .config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()

Running the job in AWS Glue with 20 GPUs each worker maps to 1 DPU (4 vCPU, 16 GB of memory, 64 GB disk)

cafelo-pfdrive commented 2 years ago

@Gatsby-Lee , @nsivabalan

We are processing around 775 million records which 30% is only inserts and the rest updates. Job takes around 57 minutes. Highlight in Yellow the stage that take the most computation time image

Gatsby-Lee commented 2 years ago

@pmgod8922

Hi, my config is pretty much like this with some change to use MoR https://github.com/apache/hudi/issues/4896

    'hoodie.compact.inline': 'false'
    'hoodie.datasource.compaction.async.enable': 'true'
tjtoll commented 2 years ago

Since you are having a complex record key, I feel the range pruning w/ bloom is not effective. Bloom filters will be effective only if your record keys have some timestamp characteristics and so we can trim few file groups with just min and max values of record keys stored in them.

So, I would recommend you to try out "SIMPLE" index instead. for random or large updates, this might work out better. Do give this blog a read to understand index types in hudi. Also, you can check out the configs for simple index here.

Is it only the record key having the timestamp characteristics? Or it is the partitioning as well? For example, if I have a random record key but my partitions are by date is BLOOM still beneficial?

Also, on tables that I do have an incrementing record key, why doesn't Hudi sort those before writing them? The files it writes have huge/overlapping ranges of record keys.

nsivabalan commented 2 years ago

@CodeCooker17 :

we have done lot of improvements around perf w/ hudi. https://hudi.apache.org/blog/2022/06/29/Apache-Hudi-vs-Delta-Lake-transparent-tpc-ds-lakehouse-performance-benchmarks

Can you try out 0.12 and let us know if you are still facing the perf issue.

nsivabalan commented 1 year ago

@CodeCooker17 : can we have any updates on this regard. if you are still facing issues, let us know. we can take another look in detailed.

nsivabalan commented 1 year ago

btw, not sure if I have called this out before. I see you are partitioning by hour. this would result in very high cardinality wrt num of partitions > 25k for few years of data. Generally its advisable to keep the total number of partitions 10k or less. If not, we have to spend lot of time doing the perf tuning. Alternatively you can employ clustering to cluster your data based on hour and reap the similar benefits based on col stats pruning w/ metadata table.

nsivabalan commented 1 year ago

Another alternative to consider is. we added bucket index and consistent bucket index was landed in master recently. So, thats another option to give it a try since there is no index lookup overhead.