apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] Hudi Record Index not working as Expected: gives warning as "WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBAL_SIMPLE for tagging records" #10507

Closed zeeshan-media closed 4 months ago

zeeshan-media commented 7 months ago

Problem Detail:

I am trying hudi record index on my machine, although my pyspark job runs smoothly and data is written along with creation of record_index file in the hudi's metadata table, it gives the following warning: _WARN SparkMetadataTableRecordIndex: Record index not initialized so falling back to GLOBALSIMPLE for tagging records. Does it mean my record Index is not working because for just 200 MB's of parquet data, it is creating 30 files in the output directory?

Environment Description:

Pyspark version : 3.3.0 hudi version : 0.14.0

I have tried this on EMR 6.15 too with details as: pyspark version: 3.4.1 hudi version : 0.14.0

The warning is generated from this part of the hudi code

image

Code Link SparkMetaDataTableRecordIndex.java

How to Reproduce

from faker import Faker
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

#..........................   Fake Data Generation ...........................
fake = Faker()
data = [{"ID": fake.uuid4(), "EventTime": fake.date_time(), 
         "FullName": fake.name(), "Address": fake.address(), 
         "CompanyName": fake.company(), "JobTitle": fake.job(), 
         "EmailAddress": fake.email(), "PhoneNumber": fake.phone_number(), 
         "RandomText": fake.sentence(), "City": fake.city(), 
         "State": fake.state(), "Country": fake.country()} for _ in range(1000)]
pandas_df = pd.DataFrame(data)

#.........................     Hoodie Properties  ............................

hoodie_properties = {
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.datasource.write.recordkey.field': 'ID, State,City',
'hoodie.metadata.enable' : True,
'hoodie.table.name' : "record_index",
'hoodie.enable.data.skipping' : True,
"hoodie.index.type" : "RECORD_INDEX",
"hoodie.metadata.record.index.enable" : True,
'hoodie.datasource.write.precombine.field': 'EventTime',
'hoodie.payload.ordering.field': 'EventTime',
'hoodie.datasource.write.partitionpath.field': 'partition',
'hoodie.datasource.write.drop.partition.columns' : True

}

if __name__ == '__main__':
    with SparkSession.builder.appName(f"hudi_record_index") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.jars", "/jars/hadoop-lzo.jar,/jars/hudi-spark3.3-bundle_2.12-0.14.0.jar")  \        # add these jars files path according to your machine
        .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.hudi.catalog.HoodieCatalog")\
        .config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension')\
        .config("spark.hadoop.parquet.avro.write-old-list-structure", False) \
        .config("spark.sql.adaptive.enabled", False) \
        .config("spark.dynamicAllocation.enabled", True) \
        .getOrCreate() as spark:

        spark.sparkContext.setLogLevel("WARN")

        df = spark.createDataFrame(pandas_df)
        df = df.withColumn("partition",F.lit("record_index"))

        df.write.format("hudi").options(**hoodie_properties).mode("overwrite").save("your_output_file_path")
ad1happy2go commented 7 months ago

@zeeshan-media Thanks for raising this. I tried the code and realised that for the first time while writing data to a empty table, it gives this warning as record_index is not present inside metadata(as there is no data), So it falls back to GLOBAL_SIMPLE for tagging which anyway doesn't matter as there is no data at all.

I confirmed, In the next run, it uses the RECORD_INDEX properly (checked on Spark UI too) and there is no warning also.

fake = Faker()
data = [{"ID": fake.uuid4(), "EventTime": fake.date_time(),
         "FullName": fake.name(), "Address": fake.address(),
         "CompanyName": fake.company(), "JobTitle": fake.job(),
         "EmailAddress": fake.email(), "PhoneNumber": fake.phone_number(),
         "RandomText": fake.sentence(), "City": fake.city(),
         "State": fake.state(), "Country": fake.country()} for _ in range(1000)]
pandas_df = pd.DataFrame(data)

hoodie_properties = {
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.write.recordkey.field': 'ID, State,City',
    'hoodie.metadata.enable' : True,
    'hoodie.table.name' : "record_index",
    'hoodie.enable.data.skipping' : True,
    "hoodie.index.type" : "RECORD_INDEX",
    "hoodie.metadata.record.index.enable" : True,
    'hoodie.datasource.write.precombine.field': 'EventTime',
    'hoodie.payload.ordering.field': 'EventTime',
    'hoodie.datasource.write.partitionpath.field': 'partition',
    'hoodie.datasource.write.drop.partition.columns' : True

}
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(pandas_df)
df = df.withColumn("partition",F.lit("record_index"))
df.write.format("hudi").options(**hoodie_properties).mode("overwrite").save(PATH)
spark.read.options(**hoodie_properties).format("hudi").load(PATH).show()
df.withColumn("City",lit("updated_city")).write.format("hudi").options(**hoodie_properties).mode("append").save(PATH)
spark.read.options(**hoodie_properties).format("hudi").load(PATH).show()
zeeshan-media commented 7 months ago

@ad1happy2go does it mean that for the first time when we run the job, record index will not be used because it is creating 30 files(7 mb each) for approximately 210 mb's (3 million records) of parquet data, which gives performance hit on read time.

ad1happy2go commented 7 months ago

@zeeshan-media Hudi upsert path involves tagging of existing hudi data to find out which records are updated, which will not use RECORD_INDEX (doesn't matter as there is no existing data) . So there is no issue at all apart from this warning message which we should ideally not show.

Parquet table read can't leverage any index or any hudi feature anyway. That is more like the source data. I hope it clears out the doubt.

Understand more here - https://hudi.apache.org/docs/write_operations#writing-path

zeeshan-media commented 7 months ago

@ad1happy2go by parquet table I meant the hudi output directory as it is in parquet format. When I first write my data in overwrite mode, hudi makes 30 files per partition.Record Index should not make such large number of files, isn't it?

Additionally, Hudi Record Index is supposed to be faster than other index as far as read is concerned. When I applied bucket indexing for the same data ,querying my data was faster in bucket indexing than that of record index written files. My number of buckets were 4 as I can control this number. But in case of record Index, the number of output files were 30, This must have had performance hit on querying, right?

ad1happy2go commented 7 months ago

@zeeshan-media In the first write, What is the size of those 30 files? The number of files should not depend on RLI anyway. Ideally small file handling should take place in upsert path and it should create proper sized partitions. it works little different as compared to bucket index with flink writer.

zeeshan-media commented 7 months ago

@ad1happy2go each file is of 7.2 Mb's precisely. I used Amazon EMR pyspark version 3.4.1. I am using 4 fields as recordKey in my job, can the number of recordKeys affect the number of files being created?

ad1happy2go commented 7 months ago

@zeeshan-media Just to be sure, data files are 7.2 MB each? Number of record keys will affect the record_index size not the data size.

Ideally small files should merge and create a bigger file. Will try your code in my local setup.

zeeshan-media commented 7 months ago

yes, It was 7.2 MB's each. I was using COW mode, I had not used faker data for that purpose, it was authentic data having 3 million records, the configurations were same, only difference was that I had enabled column-stats property of hudi.

ad1happy2go commented 7 months ago

@zeeshan-media If I understand you clearly, with column stats you got properly size files but with RLI you getting small files. Can you message me on slack when you see this, we can have a quick call on this. Thanks.