apache / incubator-xtable

Apache XTable (incubating) is a cross-table converter for lakehouse table formats that facilitates interoperability across data processing systems and query engines.
https://xtable.apache.org/
Apache License 2.0
853 stars 140 forks source link

AWS Glue Hudi to ICEBERG Tables Fails #362

Closed soumilshah1995 closed 6 months ago

soumilshah1995 commented 7 months ago

Hello im trying to translate Hudi metadata into ICEBERG I was able to do Hudi to delta

sourceFormat: HUDI
targetFormats:
  - DELTA
datasets:
  -
    tableBasePath: s3://soumil-dev-bucket-1995/silver/table_name=orders/
    tableName: orders

Following above works

sourceFormat: HUDI
targetFormats:
  - ICEBERG
datasets:
  -
    tableBasePath: s3://soumil-dev-bucket-1995/silver/table_name=orders/
    tableName: orders

Hudi version : 0.12

Spark Version : 3.3.0-amzn-1

Java Version sh-4.2$ java -version openjdk version "1.8.0_392" OpenJDK Runtime Environment Corretto-8.392.08.1 (build 1.8.0_392-b08) OpenJDK 64-Bit Server VM Corretto-8.392.08.1 (build 25.392-b08, mixed mode) sh-4.2$

I see following error


sh-4.2$ java -jar  ./utilities-0.1.0-beta1-bundled.jar --dataset ./my_config.yaml
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-03-02 14:06:30 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumil-dev-bucket-1995/silver/table_name=orders/ for following table formats [ICEBERG]
2024-03-02 14:06:32 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-03-02 14:06:35 ERROR io.onetable.spi.sync.TableFormatSync:61 - Failed to sync snapshot
java.lang.IllegalArgumentException: Cannot add field order_id as an identifier field: not a required field
        at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at org.apache.iceberg.Schema.validateIdentifierField(Schema.java:126) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at org.apache.iceberg.Schema.lambda$new$0(Schema.java:106) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_392]
        at org.apache.iceberg.Schema.<init>(Schema.java:106) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at org.apache.iceberg.Schema.<init>(Schema.java:91) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at org.apache.iceberg.Schema.<init>(Schema.java:83) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.iceberg.IcebergSchemaExtractor.toIceberg(IcebergSchemaExtractor.java:90) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.iceberg.IcebergClient.initializeTableIfRequired(IcebergClient.java:125) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.iceberg.IcebergClient.beginSync(IcebergClient.java:113) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.spi.sync.TableFormatSync.getSyncResult(TableFormatSync.java:107) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.spi.sync.TableFormatSync.syncSnapshot(TableFormatSync.java:54) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.client.OneTableClient.lambda$syncSnapshot$4(OneTableClient.java:167) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at java.util.HashMap.forEach(HashMap.java:1290) ~[?:1.8.0_392]
        at io.onetable.client.OneTableClient.syncSnapshot(OneTableClient.java:165) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.client.OneTableClient.sync(OneTableClient.java:122) ~[utilities-0.1.0-beta1-bundled.jar:?]
        at io.onetable.utilities.RunSync.main(RunSync.java:162) ~[utilities-0.1.0-beta1-bundled.jar:?]
2024-03-02 14:06:35 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG]
sh-4.2$ 
ForeverAngry commented 7 months ago

Hi! I think you have to define the partition spec, when the source is Hudi, right?

soumilshah1995 commented 7 months ago

Table are not partitioned

Sync ran fine for delta tables it only fails for iceberg

the-other-tim-brown commented 7 months ago

This is an issue with the Hudi table's record key field not being required (field is likely nullable in hudi metadata). You can try to rewrite the Hudi table with the record key field as a required (non-nullable) field.

Adding this to track our options for improvements in handling this case: https://github.com/apache/incubator-xtable/issues/366

the-other-tim-brown commented 7 months ago

@soumilshah1995 this is actually fixed in the code on main, which version are you running?

soumilshah1995 commented 7 months ago

I was using the jar that was given on GH page utilities-0.1.0-beta1-bundled.jar is that not the jar I should be using ?

the-other-tim-brown commented 7 months ago

You can use that jar but it is not the latest code with the bug fix. If you want to use the 0.1.0-beta1 jar, you'll need to rewrite your source table so the record key column is a required field.

soumilshah1995 commented 7 months ago

I want to kindly assure you that the record_key field is indeed not null. When you emphasize that record_key is required, it implies that writing to the Hudi table isn't possible if the keys are null. However, I'm uncertain if I fully grasp your point. Could you please provide further clarification?

the-other-tim-brown commented 7 months ago

Hudi can write a column with no null values but still list the field as nullable in its schema. By default when writing from spark, I believe all fields are listed as nullable. If this schema says the field is nullable, XTable will consider this a nullable field since it is listed as nullable in the schema of Hudi and the underlying parquet files.

You can check this by inspecting the Hudi commit metadata.

soumilshah1995 commented 7 months ago

While writing data do I need to specify schema

schema = StructType([
    StructField("customer_id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=True),  # Example of nullable field
    StructField("state", StringType(), nullable=False),
    StructField("city", StringType(), nullable=False),
    StructField("email", StringType(), nullable=False),
    StructField("created_at", TimestampType(), nullable=False),
    StructField("address", StringType(), nullable=False)
])

this should fix the issue ?

the-other-tim-brown commented 7 months ago

@soumilshah1995 you would need to incorporate that schema into your writer. One way I've set the schema on the writer in the past is with the hoodie.write.schema option. This takes in a avro schema as a string.

soumilshah1995 commented 7 months ago

Roger that ill try this case and update soon

soumilshah1995 commented 6 months ago

Just wanted to update you guys I didn't get chance today to try ill try during my free time soon and update you guys on GH

soumilshah1995 commented 6 months ago

Tests

""""
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}

%pip install Faker

permanently delete

"""

try:
    import sys, random, uuid
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime, date
    import boto3, pandas
    from functools import reduce
    from pyspark.sql import Row
    from faker import Faker
except Exception as e:
    print("Modules are missing : {} ".format(e))

job_start_ts = datetime.now()
ts_format = '%Y-%m-%d %H:%M:%S'

spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()

global faker
faker = Faker()

def get_customer_data(total_customers=2):
    customers_array = []
    for i in range(0, total_customers):
        customer_data = {
            "customer_id": str(uuid.uuid4()),
            "name": faker.name(),
            "state": faker.state(),
            "city": faker.city(),
            "email": faker.email(),
            "created_at": datetime.now().isoformat().__str__(),
            "address": faker.address(),

        }
        customers_array.append(customer_data)
    return customers_array

def get_orders_data(customer_ids, order_data_sample_size=3):
    orders_array = []
    for i in range(0, order_data_sample_size):
        try:
            order_id = uuid.uuid4().__str__()
            customer_id = random.choice(customer_ids)
            order_data = {
                "order_id": order_id,
                "name": faker.text(max_nb_chars=20),
                "order_value": random.randint(10, 1000).__str__(),
                "priority": random.choice(["LOW", "MEDIUM", "HIGH"]),
                "order_date": faker.date_between(start_date='-30d', end_date='today').strftime('%Y-%m-%d'),
                "customer_id": customer_id,

            }
            orders_array.append(order_data)
        except Exception as e:
            print(e)
    return orders_array

def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)

# Define total number of customers and order data sample size
total_customers = 10
order_data_sample_size = 20

# Generate customer data
customer_data = get_customer_data(total_customers=total_customers)

# Generate order data
order_data = get_orders_data(
    order_data_sample_size=order_data_sample_size,
    customer_ids=[i.get("customer_id") for i in customer_data]
)

# Define schema for customer data
customer_schema = StructType([
    StructField("customer_id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("state", StringType(), nullable=False),
    StructField("city", StringType(), nullable=False),
    StructField("email", StringType(), nullable=False),
    StructField("created_at", StringType(), nullable=False),
    StructField("address", StringType(), nullable=False)
])

# Create DataFrame for customer data using the defined schema
spark_df_customers = spark.createDataFrame(data=[tuple(i.values()) for i in customer_data], schema=customer_schema)

# Show the DataFrame
spark_df_customers.show()

# Define schema for order data
order_schema = StructType([
    StructField("order_id", StringType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("product_id", StringType(), nullable=False),
    StructField("quantity", StringType(), nullable=False),  # Change to StringType
    StructField("total_price", StringType(), nullable=False),  # Change to StringType
    StructField("order_date", StringType(), nullable=False)  # Change to StringType
])

# Create DataFrame for order data using the defined schema
spark_df_orders = spark.createDataFrame(data=[tuple(i.values()) for i in order_data], schema=order_schema)

# Show the DataFrame
spark_df_orders.show()

BUCKET = "soumil-dev-bucket-1995"

upsert_hudi_table(
    glue_database="default",
    table_name="customers",
    record_id="customer_id",
    precomb_key="created_at",
    table_type='COPY_ON_WRITE',
    partition_fields="state",
    method='upsert',
    index_type='BLOOM',
    enable_partition=True,
    enable_cleaner=False,
    enable_hive_sync=True,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='false',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path=f"s3://{BUCKET}/silver/table_name=customers/",
    spark_df=spark_df_customers,
)

upsert_hudi_table(
    glue_database="default",
    table_name="orders",
    record_id="order_id",
    precomb_key="order_date",
    table_type='COPY_ON_WRITE',
    partition_fields="default",
    method='upsert',
    index_type='BLOOM',
    enable_partition=False,
    enable_cleaner=False,
    enable_hive_sync=True,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='false',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path=f"s3://{BUCKET}/silver/table_name=orders/",
    spark_df=spark_df_orders,
)

configyml

sourceFormat: HUDI

targetFormats:
  - ICEBERG
datasets:
  -
    tableBasePath: s3://soumil-dev-bucket-1995/silver/table_name=customers/
    tableName: customers
    partitionSpec: state:VALUE

Result

J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions 1.7.x or earlier.
SLF4J: Ignoring binding found at [jar:file:/home/glue_user/workspace/jupyter_workspace/utilities-0.1.0-beta1-bundled.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See https://www.slf4j.org/codes.html#ignoredBindings for an explanation.
2024-03-08 19:40:15 INFO  io.onetable.utilities.RunSync:141 - Running sync for basePath s3://soumil-dev-bucket-1995/silver/table_name=customers/ for following table formats [ICEBERG]
2024-03-08 19:40:17 INFO  io.onetable.client.OneTableClient:264 - No previous OneTable sync for target. Falling back to snapshot sync.
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
2024-03-08 19:40:27 INFO  io.onetable.client.OneTableClient:127 - OneTable Sync is successful for the following formats [ICEBERG]
sh-4.2$ 
sh-4.2$ 

Solved

soumilshah1995 commented 6 months ago

@the-other-tim-brown closing ticket