apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] How to run Periodic Compaction? Multiple Tables - When no Upserts #2151

Closed tandonraghav closed 4 years ago

tandonraghav commented 4 years ago

I have a use case where Mongo Oplogs are ingested into Kafka Topic via Debezium.

These oplogs are from N Collections, since Hudi does'nt support Multiple table insertions in a DataSource.

I am taking these oplogs (Spark+Kafka Dstreams) and bucketing based on collection names and then inserting it into Hudi Tables. (N Hudi Tables corresponding to N collections)

Expected Behaviuor-

Code Snippet (Spark Streaming)

//Read From Kafka 
List<Row> db_names = df.select("source.ns").distinct().collectAsList();

for(int i =0;i<db_names.size();i++){
    Dataset<Row> ds=kafkaDf.select("*").where(kafkaDf.col("source.ns").equalTo(dbName)); 
    // Few other Transformations
    persistDFInHudi(ds,sanitizedDBName,tablePath);
}

private void persistDFInHudi(Dataset<Row> ds, String dbName, String tablePath) {
        ds
                .write().format("org.apache.hudi").
                options(QuickstartUtils.getQuickstartWriteConfigs()).

                option(DataSourceWriteOptions.OPERATION_OPT_KEY(),DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL()).
                option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(),DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()).
                option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "ts_ms").

                option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_id").
                option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(), MergeHudiPayload.class.getName()).
                option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "db_name").
                option(HoodieWriteConfig.TABLE_NAME, dbName).
                option(HoodieCompactionConfig.INLINE_COMPACT_PROP,true).
                option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(),"true").
                option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), dbName).
                option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(),"db_name").
                option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL(),"false").
                option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,1).
                option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(),false).
                option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getName()).
                mode(SaveMode.Append).
                save(tablePath);
    }

Also, tried to use

            Option<String> instantTime=client.scheduleCompaction(Option.empty()); // How to get commit instant time?
            JavaRDD<WriteStatus> writeStatues = client.compact(instantTime);
            client.commitCompaction(instantTime, writeStatues, Option.empty());
        }
bvaradar commented 4 years ago

@tandonraghav : the correct setup would be to actually keep separate Hudi tables for each Mongo collection (table) as the schema could be different across collections. You should have Debezium route CDC for eac collection to its own dedicated Kafka Topic and have Hudi read from separate topics to build them.

Have you looked at DeltaStreamer in Hudi (continuous mode). That tool is tailor made for the use-case you are describing.

tandonraghav commented 4 years ago

@bvaradar Mongo Collection-> Hudi Table (This is our setup also)

            Option<String> instantTime=client.scheduleCompaction(Option.empty());
            client.compact(instantTime.get());

But if this code fails somehow then how do i get the last compaction instantTime? So, that I can use to compact again.

Regarding DeltaStreamer, I guess it works with only one table. MultiDeltaStreamer I have not experimented with.

bvaradar commented 4 years ago

@tandonraghav : Regarding your question on compaction, Since you are using WriteClient level APIs, you can use HoodieTable.getHoodieView().getPendingCompactionOperations().map(Pair::getKey()).distinct() will give you compaction timestamps.

Regarding the setup, 100s of topics in kafka should be fine and not an anti-pattern.Curious how you manage various schema corresponding to each Mongo collection when writing to single kafka topic. Are you using avro encoding and schema registry ?

tandonraghav commented 4 years ago

@bvaradar Thanks for the answer. Below is how our set up looks like-

Again, I want to say we save via Spark Dataframe. Do not want to Compact inline due to the volume of records. But we are compacting via HoodieClient in a different Spark job for multiple tables every X min. As, I am not able to find a way using spark DF to run only Compaction.

Do you see any issue in saving via DF and concurrently running Compaction via HoodieClient?

Very soon, we will be doing the perf for our setup with Hudi. Will keep you posted.

bvaradar commented 4 years ago

@tandonraghav : It should work as is with 0.6.0. you should be able to run spark.write() with inline compaction off. Based on compaction schedule, this write will schedule compactions. You can then use your writeClient code to run async compactions.

Just so that you are made aware of all things : Note that inline compaction does not need to run every single time you are ingesting data. You can set it to run every N commits but it will be inline when it runs (blocks writing)

We usually have folks running async compaction in delta-streamer continuous mode and in structured streaming (recently). Async compaction in spark DF write or in deltastreamer run-once mode is generally not done as users need to setup separate compaction job. Let me open a jira to run compaction alone using spark.write() to make it easier...

bvaradar commented 4 years ago

@tandonraghav : I just edited the comment above after reviewing 0.6.0 code. Please re-read the above comment to avoid any confusion

bvaradar commented 4 years ago

FYI : https://issues.apache.org/jira/browse/HUDI-1329

tandonraghav commented 4 years ago

@bvaradar Yes, i am aware that Compaction need not run everytime and can run after X delta commits.

But Lets say I have done X-Y Delta commits and then there are no writes on the table for few minutes. Then how will you get Fresh Data from _ro (we are using Presto, so *_rt is not the option)

DeltaStreamer is not an option as we have multiple tables in same topic? I believe Streamer will not work for such a setup.

tandonraghav commented 4 years ago

@bvaradar And you are saying in 0.6.0, if I have given hoodie.compact.inline=false and hoodie.compact.inline.max.delta.commits=2 then the moment there are 2 delta commits it will schedule a Compaction timeline. And then we can query compaction timeline and do all the compactions? Is this understanding correct?

And what about situation where we have only 1 delta commit for few minutes but we want fresh data in _ro after X minutes?

bvaradar commented 4 years ago

@tandonraghav : Can you elaborate on your expected event rate and what is your data-freshness SLA expectations ? Its possible that Copy-On-Write table would be good enough.

To your other question on data-freshness: Yes your understanding is correct. If you have sporadic incoming changes and have tighter data-freshness SLA, you can set hoodie.compact.inline.max.delta.commits=1 and run compactions at interval corresponding to your data-freshness req.

tandonraghav commented 4 years ago

@bvaradar SLA ~ 15m for fresh data... and we can have around 10-30M updates/client..But this varies from client to client. But we want to test for 50M/client and there can be X clients like that...

So,COW is not an option.

Also, i just tried with this config on HUDI 0.6.0

ds
                .write().format("org.apache.hudi").
                options(QuickstartUtils.getQuickstartWriteConfigs()).
                //option("hoodie.avro.schema",SchemaUtils.schemaMapper.get(dbName)).
                option(DataSourceWriteOptions.OPERATION_OPT_KEY(),DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL()).
                option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(),DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()).
                option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "ts_ms").
                //option("hoodie.combine.before.upsert",false).
                option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_id").
                option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY(), MergeHudiPayload.class.getName()).
                option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "db_name").
                option(HoodieWriteConfig.TABLE_NAME, dbName).
                **option(HoodieCompactionConfig.INLINE_COMPACT_PROP,false).**
                option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(),"true").
                option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), dbName).
                option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(),"db_name").
                option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL(),"false").
                **option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,1).**
                option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(),false).
                option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getName()).
                mode(SaveMode.Append).
                save(tablePath);

And i dont see any compactionTimeline requested (I checked inside folders). Am i missing something?

-rw-r--r--  1 raghav  wheel     0B Oct  9 14:33 20201009143322.deltacommit.requested
-rw-r--r--  1 raghav  wheel   1.6K Oct  9 14:33 20201009143322.deltacommit.inflight
-rw-r--r--  1 raghav  wheel   1.9K Oct  9 14:33 20201009143322.deltacommit
-rw-r--r--  1 raghav  wheel     0B Oct  9 14:34 20201009143417.deltacommit.requested
-rw-r--r--  1 raghav  wheel   1.6K Oct  9 14:34 20201009143417.deltacommit.inflight
-rw-r--r--  1 raghav  wheel   1.9K Oct  9 14:34 20201009143417.deltacommit
bvaradar commented 4 years ago

Can you cat the contents of 20201009143322.deltacommit and 20201009143417.deltacommit

tandonraghav commented 4 years ago

@bvaradar I think the issue was -> option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY(),false). If i set this also to true. It is creating the compaction.requested.

tandonraghav commented 4 years ago

@bvaradar Thanks for helping here. But still issue is not solved.

The Freshness of data is coming as a big concern. Probably i am missing something here.

The problem still remains same tht if I have delta.commits=5 and i have only 3 delta commits and I run compaction then i would not get fresh data and compaction request would not even kick in.

How do i solve this problem?

bvaradar commented 4 years ago

@tandonraghav : It is by design that a "file" which is pending compaction is not scheduled for compaction till the compaction is done.

One another knob is the strategy for selecting files for compaction Scheduling which is also pluggable. For example : You can implement your own CompactionStrategy to prioritize files belonging to "hot" partitions and keeping the number of files per compaction to be less.

Basically, You need to run compactions at higher frequencies and keeping delta.commits=1 if you are trying to optimize for data-freshness but want to use Read-Optimized queries.

tandonraghav commented 4 years ago

@bvaradar Thanks for the update.