apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

Hudi upsert doesnt trigger compaction for MOR #4839

Closed shahiidiqbal closed 2 years ago

shahiidiqbal commented 2 years ago

We have a spark streaming job to ingest real-time data coming through Kafka connect from MongoDB. Somehow Hudi upsert doesn't trigger compaction and if we look at the partition folders there are 1000s of log files that should be cleaned after compaction. There are also lots of files including .commits_.archive, .clean, .clean.inflight, .clean.requested, .deltacommits, sdeltcommits.inflight, .deltacommits.requested in hoodi folder.

I used following cli commands but they are showing nothing/empty records cleans show compactions show all show fsview all

Here are the configs we use for hudi spark streaming on AWS EMR 6 and Hudi 0.9

'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': 'entity_id',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.database': DATABASE_NAME,
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.jdbcurl': 'jdbc:hive2://url',
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
'hoodie.datasource.write.precombine.field': 'event_time',
'hoodie.payload.ordering.field': 'event_time',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.index.type' : 'GLOBAL_BLOOM',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2

'hoodie.table.name': TABLE_NAME, 'hoodie.datasource.hive_sync.table': TABLE_NAME,
'hoodie.datasource.write.partitionpath.field': 'partition_date', 'hoodie.datasource.hive_sync.partition_fields': 'partition_date',

Can anyone help what we are missing to enable compaction and how it works? because are unable to get updated data from read optimized view _ro table even after hours.

mysqldba-nandy commented 2 years ago

We change compaction.trigger.strategy, compaction.delta_commits, compaction.delta_seconds default compaction option for flink jobs, maybe you need hoodie.compact.inline.xxx too. https://hudi.apache.org/docs/configurations/#Compaction-Configs

nsivabalan commented 2 years ago

Are you using kafka-connect for hudi or are you using spark streaming? in case of kafka connect, compaction has to be executed separately. Only scheduling will be done by the kafka-connect nodes. If its spark streaming, we can look into the configs.

shahiidiqbal commented 2 years ago

@nsivabalan We use Kafka-connect to get changes (CDC) from MongoDB and after that we use Spark streaming to read topics from Kafka and store data into our data lake through Hudi.

Here is the spark streaming code to read kafka topic and then write data through Hudi:

def write_hudi_table(df, epoch_id):

we do some cleansing here

    df.write.format('org.apache.hudi') \
            .options(**tableHudiOptions) \
            .mode('append') \
            .save(f'{HUDI_TABLE_BASE_PATH}/{TABLE_NAME}/')

rawDf = spark.readStream \ .format("kafka") \ .options(**kafkaOptions) \ .option("subscribe", TOPIC_NAME) \ .load() \ .select(F.col("value").cast("string"))

query = finalDf.writeStream \ .queryName(f"Writing Table {TABLE_NAME}") \ .foreachBatch(write_hudi_table) \ .option("checkpointLocation", f"{CHECKPOINT_BASE_PATH}/{TABLE_NAME}/checkpoint/") \ .start()

query.awaitTermination()

shahiidiqbal commented 2 years ago

Compaction problem is only with foreachBatch, if we write direct stream then it works perfectly. No idea, whether it is a bug or behavior.

nsivabalan commented 2 years ago

this is directly using spark data source write

write_hudi_table(df, epoch_id):
#we do some cleansing here
df.write.format('org.apache.hudi')
.options(**tableHudiOptions)
.mode('append')
.save(f'{HUDI_TABLE_BASE_PATH}/{TABLE_NAME}/')

and is not using HoodieStreamingSink. so, this is expected. There is no async table service if you are directly using spark datasource write.

you might have to directly write to stream.

nsivabalan commented 2 years ago

Feel free to close the issue if you don't have any more questions/clarifications

srehman420 commented 2 years ago

What should be the approach in case you have multiple sinks, i.e you split your incoming data into multiple hudi tables. How would you store these without using foreachBatch ?

nsivabalan commented 2 years ago

yeah. we don't have a good solution at the moment. it might the for each batch as you are going w/, but it may not use the streaming sink flow. but the streaming sink (w/ async table services) in general works for one table at a time. So, may not be easy to retrofit to write to multiple hudi tables. We have to design something properly.

Gatsby-Lee commented 2 years ago

@nsivabalan I have a question. if I run a separate spark app to run async Table compaction service, can it do the compaction for multiple tables?

Gatsby-Lee commented 2 years ago

@shahiidiqbal I don't know your data volume. if your volume is not too big, then try with CoW. It's simpler and less things you need to understand and optimize.

srehman420 commented 2 years ago

@Gatsby-Lee our data volume is approx 200-300 events per sec, but it mostly (upto 75%) comprises of updates. This is why we are using ROM table.

Is COW table a fine choice with upserts in such a scenerio ?

nsivabalan commented 2 years ago

@Gatsby-Lee : we don't have any option at the moment. But we have a RFC and an active PR to support compaction/clustering for multiple tables https://github.com/apache/hudi/pull/4309 https://github.com/apache/hudi/pull/4872

@glory9211 : If you don't mind taking a slight on your write latency and having a good read perf, you can go for COW. but if write speed is very critical, then you may want to stick with MOR.

srehman420 commented 2 years ago

@nsivabalan For COW tables written using foreach batch, will clustering work or will it show same behaviour as MOR compaction i.e ( clustering isn't trigger for COW tables with foreach batch but works correctly when using pure streaming apis)

Gatsby-Lee commented 2 years ago

@glory9211 Like @nsivabalan said, if your data latency is not critical, then CoW works as well. I use CoW for some datasets that have incoming data around 600-800/sec. I designed to use MoR initially, but due to the Async Compaction issue in AWS Glue, I moved to CoW for now. If the data latency is not big concern, then try with the increased windowSize. You can set the windowSize when you setup forEachBatch. What I learned is that as Big Data, Hudi write has high latency + high throughput. So, rather than writing many small batch, try with a bigger size batch with less write. I believe that you can get some different ideas

@glory9211 Good question. Yes. I'd like to ask the same that if Async Clustering is triggered or run in AWS Glue when it is enabled.

@nsivabalan Does the Async Clustering has to run independently?

nsivabalan commented 2 years ago

@glory9211 : we don't have operational exp w/ glue unfortunately and thats why can't provide much help. And so, I don't really know if you are missing any specific configs or does glue streaming has limitations around this. but in general, w/ streaming sink to hudi, async compaction and async clustering works. I have tested it and few other users from the community as well.

One another option you can try is, inline clustering. https://hudi.apache.org/docs/configurations/#Clustering-Configs hoodie.clustering.inline = true and other configs for clustering strategies etc.

nsivabalan commented 2 years ago

alternatively, you can also try separate aysnc job https://hudi.apache.org/docs/clustering/#asynchronous-clustering But do remember to enable lock service provider.

Gatsby-Lee commented 2 years ago

@nsivabalan I see. So, in Spark Streaming, Async Table services are expected to run.

Thank you

nsivabalan commented 2 years ago

Closing the github issue as the original question has been answered. Others: feel free to create new issue if you are looking for assistance. thanks!

Gatsby-Lee commented 2 years ago

For anyone who gets here because Async Compaction doesn't work - AWS Glue + Hudi + MoR Table, If you use Spark Write directly, Async compaction won't work.

glue_context.write_dynamic_frame.from_options(
  frame=dyf,
  connection_type="marketplace.spark",
  connection_options=hudi_config,
)
menna224 commented 1 year ago

hello @shahiidiqbal can you please provide snippet from the code in which you write stream directly how did u pass the cleansing function to it?

Gatsby-Lee commented 1 year ago

AWS Glue has Glue 4.0 ( Hudi 0.12.1 support ) I haven't tried it yet, but it might support MoR. https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-hudi.html

If there is anyone who have tried it, plz share your result with us.

abhisheksahani91 commented 1 year ago

@Gatsby-Lee I tried Glue 4.0, I have used HoodieDeltaStreamer to read from Kafka and write it to the Hudi MoR table

I can see *.compaction.requested file under the .aux folder but other timelines INFLIGHT and COMPLETED for compaction are missing. I don't think async compaction is working even though my job is running in continuous mode.

abhisheksahani91 commented 1 year ago

@Gatsby-Lee Below are the files present in .hoodie folder

2023-06-12 20:03:53 0 20230612143340370.compaction.inflight 2023-06-12 20:03:43 12375 20230612143340370.compaction.requested 2023-06-12 20:12:54 0 20230612144240993.compaction.inflight 2023-06-12 20:12:44 106782 20230612144240993.compaction.requested 2023-06-13 02:08:46 0 20230612203831227.compaction.inflight 2023-06-13 02:08:34 106101 20230612203831227.compaction.requested

AWS Cloudwatch logs: 23/06/12 20:38:44 INFO AsyncCompactService: Starting Compaction for instant [==>20230612203831227compactionREQUESTED] 23/06/12 20:51:30 INFO AsyncCompactService: Finished Compaction for instant [==>20230612203831227compactionREQUESTED]

From logs, it looks like Async compaction is working but finished compaction timelines are missing

Please help me to understand how frequently Aync compaction will be requested, and also why only 3 requests has been triggered for Async compaction

Gatsby-Lee commented 1 year ago

@abhisheksahani91 I haven't tried the Glue 4.0 yet and also I don't use MoR table due to the compaction issue. BTW, I am one of Hudi users :D