apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Needed a way to load the specific data from the HUDI DATALAKE. #10852

Closed jayesh2424 closed 4 months ago

jayesh2424 commented 4 months ago

I have a Hudi datalake in my AWS. Currently to have a ETL operation I usually use the full load of Hudi Datalake for my operations. I want to know how Can I have a particular set of data only from the Hudi datalake.

What I really want to achieve is a method like create_dynamic_frame.from_options(). Where we sent a samplequery to the database and fetch particular set of data only. Just like that I want to send a SQL query to the Hudi Datalake.

The main goal is rather loading datalake and then filter out. I want to filter out the datalake and then load the particular part of datalake only. It will be great if I am able to load this particular data with help of spark sql.

jayesh2424 commented 4 months ago

@xushiyan, @ad1happy2go and @codope could you please help me out with this ?

ad1happy2go commented 4 months ago

@jayesh2424 Sorry but I am not exactly clear of the question.

In case you are asking how to read a specific part of table, You can read a data frame and do where/filter on that. If your dataset is partitioned and if you apply the partition column filter then it is only going to read those subdirectories only (in case you meant that by part of data lake)

In case you want to use sql, you can do - df.createOrReplaceTempView("temp_table") spark.sql("select * from temp_table where a = 1")

jayesh2424 commented 4 months ago

@ad1happy2go Okay, May be the question is not clear. But What you have suggested is have a full load of entire datalake. Then have it in a dataframe. So that doing df.createOrReplaceTempView("temp_table") will work for further filtering. What you have proposed is example of Loading the datalake and then doing a filter job.

What I am saying is filter the datalake and load that much part only. For example, This is how we load the datalake :-> datalake_full_load = self.spark.read.format('org.apache.hudi').load(target_path)

here we can use .select() for pulling only particular columns or .filter() etc. What I want is something like datalake_full_load = self.spark.read.format('org.apache.hudi').load(target_path).filter("select date(created) as created, count(*) as datalake_count from datalake group by date(created)")

My SQL might sound weird but don't take it word to word. I may want to achieve similar result but this is not something I am pointing. I am just saying what window of data I want from the Datalake.

Also the partition thing you said. My datalake is partitioned but it's not partition on the basis of dates. And I want data based on dates. Here, the created is timestamp value and hence not used as a partition in my datalake.

ad1happy2go commented 4 months ago

@jayesh2424 Spark has lazy evaluation, so it will not be run until any action is called. For partitioned table - when you read a df and then do filter on partition column and then count, then it is going to read only those partitions and not all.

For non partitioned table OR filter doesn't contain partition column clause - In normal case entire table will be read as it won't know which file will have data which it is interested in. It has to atleast read metadata in parquet footer. Although you have hudi indexes to optimise those kind of queries and read only limited parquet files.

https://medium.com/@think-data/mastering-lazy-evaluation-a-must-know-for-pyspark-pros-ac855202495e

ad1happy2go commented 4 months ago

@jayesh2424 Does above answers your question? do let us know. Please close if you are all good here.

jayesh2424 commented 4 months ago

Thank you @ad1happy2go and @codope We can close this