apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[MetaData Indexing via Glue ETL] #8954

Open manishgaurav84 opened 1 year ago

manishgaurav84 commented 1 year ago

I am using the Glue 3.0 version to execute meta dat indexing for a hudi table.

non_global_bloom_indexing = { 'hoodie.table.name': tbl_name, 'hoodie.datasource.write.precombine.field': precombine, 'hoodie.datasource.write.recordkey.field': primary_key, 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.hive_style_partitioning':'true', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': tbl_name, 'hoodie.datasource.hive_sync.database': db_name, 'hoodie.datasource.hive_sync.partition_fields': partition_fields, 'hoodie.datasource.hive_sync.mode': 'hms', 'hoodie.datasource.hive_sync.partition_extractor_class':'org.apache.hudi.hive.HiveStylePartitionValueExtractor', 'hoodie.datasource.write.keygenerator.class':'org.apache.hudi.keygen.SimpleKeyGenerator', "hoodie.datasource.write.partitionpath.field": partition_fields, 'hoodie.upsert.shuffle.parallelism': 189, 'hoodie.metadata.enable' : 'true', "hoodie.metadata.index.bloom.filter.enable":'true', 'hoodie.metadata.index.column.stats.enable' :'true', 'hoodie.metadata.index.column.stats.column.list':partition_fields, 'hoodie.enable.data.skipping':'true', "hoodie.index.type":'BLOOM', "hoodie.bloom.index.use.metadata" : 'true', "hoodie.bloom.index.filter.type": "DYNAMIC_V0", "hoodie.bloom.index.use.caching": "true",

'hoodie.datasource.write.partitionpath.urlencode': 'true'

}

As expected inside  | .hoodie/ | Folder | - | - | - below partitions are created.   | bloom_filters/ | Folder | - | - | -   | column_stats/ | Folder | - | - | -   | files/ | Folder | - | - | -

But the timeline files .indexing.requested, indexing.inflight , indexing are not created,

ad1happy2go commented 1 year ago

@manishgaurav84 Can you post the command which you are using for metadata indexing?

manishgaurav84 commented 1 year ago

I am trying the inline indexing mode, like we have for clustering or compaction. The above options are passed in df.write.options. I assume this will kick off data upsert and then indexing.

ad1happy2go commented 1 year ago

Got it, I will try to triage this. Thanks for quick response.

soumilshah1995 commented 1 year ago

try these

https://github.com/soumilshah1995/Advantages-of-Metadata-Indexing-and-Asynchronous-Indexing-in-Hudi-Hands-on-Lab


try:
    import sys
    import os
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3
    from functools import reduce
    from pyspark.sql import Row

    import uuid
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(10000)
        ]

# ============================== Settings =======================================
db_name = "hudidb"
table_name = "employees"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://XXXXXXXXXXX/hudi/"
method = 'upsert'
table_type = "MERGE_ON_READ"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',

    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",

    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.support_timestamp': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name

    , "hoodie.clean.automatic": "false"
    , "hoodie.clean.async": "false"
    , "hoodie.clustering.async.enabled": "true"
    , "hoodie.metadata.enable": "true"
    , "hoodie.metadata.index.async": "true"
    , "hoodie.metadata.index.column.stats.enable": "true"
    ,"hoodie.metadata.index.check.timeout.seconds":"60"
    ,"hoodie.write.concurrency.mode":"optimistic_concurrency_control"
    ,"hoodie.write.lock.provider":"org.apache.hudi.client.transaction.lock.InProcessLockProvider"

}

for i in range(0, 5):
    data = DataGenerator.get_data()
    columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
    spark_df = spark.createDataFrame(data=data, schema=columns)
    spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
ad1happy2go commented 1 year ago

@manishgaurav84 Did you got a chance to try out the code shared by @soumilshah1995 . Are you still facing this issue?

soumilshah1995 commented 1 year ago

i had tested it should work :D

ad1happy2go commented 1 year ago

@manishgaurav84 Did it worked? Feel free to close this issue if you are good.