apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] Hudi Stored Procedure show clustering fails on AWS Glue 4.0 #8919

Closed soumilshah1995 closed 1 year ago

soumilshah1995 commented 1 year ago

Hello Hope all well while i was playing with clustering i tried few ways to perform clustering using stored procedures. thought to create an issue so it can be tracked

try:
    import sys
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
except Exception as e:
    print("Modules are missing: {}".format(e))

# Get command-line arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Create a Spark session and Glue context
spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args['JOB_NAME'], args)

db_name = "hudidb"
table_name = "customers"
path = "s3://soumilshah-hudi-demos/silver/table_name=customers/"

query_show_commits = f"call show_commits('{db_name}.{table_name}', 5)"
spark_df_commits = spark.sql(query_show_commits)
commits = list(map(lambda row: row[0], spark_df_commits.collect()))
spark_df_commits.show()

try:
    print("Trying clustering 1..")
    query_show_clustering = f"call run_clustering('{db_name}.{table_name}')"
    spark_df_clusterings = spark.sql(query_show_clustering)
    spark_df_clusterings.show()
    print(" clustering 1 complete ")
except Exception as e:
    print("Error 1", e)

try:
    print("Try show clustering 2")
    query = f"call show_clustering('{db_name}.{table_name}')"
    result_df = spark.sql(query)
    result_df.show()
    print("Complete show clustering 2 ")
except Exception as e:
    print("Error show clustering 2", e)

try:
    print("Try show clustering 1 ")
    query = f"call show_clustering('{path}')"
    result_df = spark.sql(query)
    result_df.show()
    print("Complete show clustering 1 ")
except Exception as e:
    print("Error show clustering 1", e)

Output

+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|      commit_time|total_bytes_written|total_files_added|total_files_updated|total_partitions_written|total_records_written|total_update_records_written|total_errors|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+
|20230609120910005|           13079729|               30|                  0|                      30|                   72|                           0|           0|
|20230607130458189|                  0|                0|                  0|                       0|                    0|                           0|           0|
|20230607125355320|           18749045|               43|                  0|                      43|                  100|                           0|           0|
+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+

+-----------------+----------------+---------+-------------------+
|        timestamp|input_group_size|    state|involved_partitions|
+-----------------+----------------+---------+-------------------+
|20230609121906926|              30|COMPLETED|                  *|
+-----------------+----------------+---------+-------------------+

Output of show clustering

An error occurred while calling o91.sql.
: java.util.NoSuchElementException: No value present in Option
    at org.apache.hudi.common.util.Option.get(Option.java:89)
    at org.apache.spark.sql.hudi.command.procedures.ShowClusteringProcedure.$anonfun$call$5(ShowClusteringProcedure.scala:79)
    at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
    at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
    at scala.collection.immutable.Stream.length(Stream.scala:312)
    at scala.collection.SeqLike.size(SeqLike.scala:108)
    at scala.collection.SeqLike.size$(SeqLike.scala:108)
    at scala.collection.AbstractSeq.size(Seq.scala:45)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:341)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConne
ad1happy2go commented 1 year ago

@soumilshah1995 I am able to successfully run clustering with your code. The third block for show clustering fails as expected as it tried to find the table name and we are passing the path.

Can you clarify more when are you seeing this error - java.util.NoSuchElementException: No value present in Option. I didn't hit this error with Glue 4.0.

Code - https://gist.github.com/ad1happy2go/4929bf4d4247c320431f147d9f1407f2

soumilshah1995 commented 1 year ago

let me try today and get back to you :D

soumilshah1995 commented 1 year ago

ALL SET

CODE

try:
    import sys, os, uuid
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from faker import Faker
except Exception as e:
    print("Modules are missing: {}".format(e))

# Get command-line arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Create a Spark session and Glue context
spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
         .config('spark.sql.hive.convertMetastoreParquet', 'false') \
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
         .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
         .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args['JOB_NAME'], args)

# =================================INSERTING DATA =====================================
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex'),

            ) for x in range(100)
        ]

data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)

# ============================== Settings =======================================
db_name = "default"
table_name = "issue_8919"
recordkey = 'emp_id'
precombine = "ts"
PARTITION_FIELD = 'state'
path = "s3://soumilshah-hudi-demos/output/" + table_name
method = 'upsert'
table_type = "COPY_ON_WRITE"
# ====================================================================================

hudi_part_write_config = {
    'className': 'org.apache.hudi',
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.reconcile.schema": "true",
    "hoodie.avro.schema.external.transformation": "true",
    'hoodie.avro.schema.validate': "true",
    "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.enable': 'true'
}

spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)

query_show_commits = f"call show_commits('{db_name}.{table_name}', 5)"
spark_df_commits = spark.sql(query_show_commits)
commits = list(map(lambda row: row[0], spark_df_commits.collect()))
spark_df_commits.show()

try:
    print("Trying clustering 1..")
    query_show_clustering = f"call run_clustering('{db_name}.{table_name}')"
    spark_df_clusterings = spark.sql(query_show_clustering)
    spark_df_clusterings.show()
    print(" clustering 1 complete ")
except Exception as e:
    print("Error 1", e)
    raise e

try:
    print("Try show clustering 2")
    query = f"call show_clustering('{db_name}.{table_name}')"
    result_df = spark.sql(query)
    result_df.show()
    print("Complete show clustering 2 ")
except Exception as e:
    print("Error show clustering 2", e)
    raise e

0/P

<html>
<body>
<!--StartFragment-->

+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+\|      commit_time\|total_bytes_written\|total_files_added\|total_files_updated\|total_partitions_written\|total_records_written\|total_update_records_written\|total_errors\|+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+\|20230704125519577\|             445969\|                1\|                  0\|                       1\|                  100\|                           0\|           0\|+-----------------+-------------------+-----------------+-------------------+------------------------+---------------------+----------------------------+------------+Trying clustering 1..
--
+-----------------+----------------+---------+-------------------+\|        timestamp\|input_group_size\|    state\|involved_partitions\|+-----------------+----------------+---------+-------------------+\|20230704125731917\|               1\|COMPLETED\|                  *\|+-----------------+----------------+---------+-------------------+ clustering 1 complete Try show clustering 2
+-----------------+----------------+---------+-------------------+\|        timestamp\|input_group_size\|    state\|involved_partitions\|+-----------------+----------------+---------+-------------------+\|20230704125731917\|               1\|COMPLETED\|                  *\|+-----------------+----------------+---------+-------------------+

<!--EndFragment-->
</body>
</html>