apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] select lots of values via Record Index #11438

Open ziudu opened 3 months ago

ziudu commented 3 months ago

Dear, I think record index is an amazing feature. However, I got some problem if I want to select a lots of records:

Backgroud: hudi_table_path = ("/ilw/test/...") updated_sub_df = spark.read.format("hudi") \ .option("hoodie.metadata.enable", "true") \ .option("hoodie.metadata.record.index.enable", "true") \ .option("hoodie.enable.data.skipping", "true") \ .load(hudi_table_path)

Method 1: Record Index and file pruning will work, but if the list is huge, 1 million for example, Spark would hang, I guess, at Catalyst Optimizer step, since it would convert the list to SQL query before the execution of Physical Plan.

values = list(range(1, 10001)) query_df = updated_sub_df.filter(updated_sub_df.new_id.isin(values)) print(query_df.count())

Method 2: we searched on Google, and found one way to broadcast the list with UDF, as follows. The Catalyst Optimizer worked fine, but Record Index, as well as file pruning was not working.

values = list(range(1, 20001)) broadcast_values = spark.sparkContext.broadcast(values) isin_values_udf = F.udf(lambda x: x in broadcast_values.value, BooleanType()) query_df = updated_sub_df.where(isin_values_udf(updated_sub_df['new_id'])) print("Count values: ", query_df.count())

Is there anyway to select lots of values via Record Index?

ad1happy2go commented 3 months ago

@ziudu This may not be the right way to use this I guess. Having 1 million values in IN clause will anyway create problems with Spark SQL planning phase.

ziudu commented 3 months ago

with a broadcast variable in Method 2 there is no Spark SQL planning issue pic_20240612175040

Method 1 is definitely not good. Even with 20K values the planning takes 1.5 minutes. 1 million would even kill Catalyst Optimizer. bad

I'm looking for a way to select millions of records from a very large table (billions of records) via Record Index. RLI's performance is amazing for thousands of records, so I want to do a pressure testing.

ad1happy2go commented 3 months ago

@ziudu When using method 2, as we are using UDF, it won't benefit with RLI as optimiser won't be able to plan that.

ziudu commented 3 months ago

is it possible to add some "case UDF" here to add this feature? https://github.com/apache/hudi/blob/8eed6aefbc4de9ea5426fefa296c0723bbc3fb14/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala#L102

ziudu commented 3 months ago

I saw someone trying to "Add a filter in Spark (I wrote something at https://issues.apache.org/jira/browse/SPARK-31417 ) which allows broadcasted pushdown all the way to your relation provider. "

https://stackoverflow.com/questions/61111172/scala-spark-isin-broadcast-list

ad1happy2go commented 3 months ago

@ziudu Thanks for these insights. This could be a useful feature for RLI. Adding @beyond1920 @nsivabalan @yihua here too

ad1happy2go commented 3 months ago

Created a JIRA on the same - https://issues.apache.org/jira/browse/HUDI-7910 @ziudu Feel free to contribute in case you have a fix.