apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] Can't get Procedures working in Glue version 4. Getting error: py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql. : java.lang.AssertionError: assertion failed: It's not a Hudi table #9844

Closed luky777 closed 11 months ago

luky777 commented 11 months ago

Describe the problem you faced Hi, I Can't get Procedures working in Glue version 4. Getting error: py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql. : java.lang.AssertionError: assertion failed: It's not a Hudi table

Here is my Glue script

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import concat_ws, col, split, size, lit
from pyspark.sql.types import StringType, BooleanType, DateType
import pyspark.sql.functions as F

args = getResolvedOptions(sys.argv, ["JOB_NAME", "hudi_table_name", "database_name", "s3_path_hudi", "s3_source", "id_column"])
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.hive.convertMetastoreParquet','false') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
    .config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

hudi_table_name = args["hudi_table_name"]
database_name = args["database_name"]
s3_path_hudi = args["s3_path_hudi"]
s3_source = args["s3_source"]
id_column = args["id_column"]

#Load S3 Raw Data
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": True},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            s3_source
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

# Hudi Settings
def get_hudi_connection_option(write_operation_type):
        connection_params = {
            "className": "org.apache.hudi",
            "hoodie.table.name": hudi_table_name,
            "path": s3_path_hudi,
            "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
            "hoodie.datasource.write.operation": write_operation_type,
            "hoodie.datasource.write.precombine.field": "timestamp",
            "hoodie.datasource.write.recordkey.field": id_column,
            "hoodie.datasource.write.hive_style_partitioning": "true",
            "hoodie.datasource.hive_sync.enable": "true",
            "hoodie.datasource.hive_sync.sync_as_datasource": "false",
            "hoodie.datasource.hive_sync.database": database_name,
            "hoodie.datasource.hive_sync.table": hudi_table_name,
            "hoodie.datasource.hive_sync.use_jdbc": "false",
            "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
            "hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS" ,
            "hoodie.cleaner.fileversions.retained": 1 ,
            "hoodie.cleaner.parallelism": 200,
            "hoodie.upsert.shuffle.parallelism": 200,
            "hoodie.datasource.write.payload.class" : "org.apache.hudi.payload.AWSDmsAvroPayload",
            "hoodie.datasource.write.transformer.class" :"org.apache.hudi.utilities.transform.AWSDmsTransformer",
        }
        return connection_params

#Save UPSERT to Hudi Table
if S3bucket_node1.count() > 0:
    print("TOTAL: ", S3bucket_node1.count())
    hudi = glueContext.write_dynamic_frame.from_options(
                frame=S3bucket_node1,
                connection_type="marketplace.spark",
                connection_options=get_hudi_connection_option("upsert"),
                transformation_ctx="hudi",
            ) 

# Get the database and table names
db_name = database_name
table_name = hudi_table_name

# Create a query to call the Hudi Call Procedure
query = f"call show_commits('{db_name}.{table_name}', 5)"
# query = f"call help(cmd => 'show_commits')"
# query = "call show_savepoints('adpdb.hudi_raw_revenue_property')"

# Execute the query and show the results
spark_df_commits = spark.sql(query)
spark_df_commits.show()

job.commit()

This is what I have in Error Log:

2023-10-10 14:00:56,574 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last): File "/tmp/Hudi_CLI_Test.py", line 89, in spark_df_commits = spark.sql(query) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError(py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql.: java.lang.AssertionError: assertion failed: It's not a Hudi table at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.(HoodieCatalogTable.scala:51) at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367) at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363) at org.apache.hudi.HoodieCLIUtils$.getHoodieCatalogTable(HoodieCLIUtils.scala:70) at org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82) at org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.Dataset.(Dataset.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) | 2023-10-10 14:00:56,574 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last): File "/tmp/Hudi_CLI_Test.py", line 89, in spark_df_commits = spark.sql(query) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql. : java.lang.AssertionError: assertion failed: It's not a Hudi table at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.(HoodieCatalogTable.scala:51) at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367) at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363) at org.apache.hudi.HoodieCLIUtils$.getHoodieCatalogTable(HoodieCLIUtils.scala:70) at org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82) at org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81) at org.apache.spark.sql.Dataset.(Dataset.scala:222) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) -- | --
luky777 commented 11 months ago

So below I simplified code, just to run hud procedure and this is my results: Error with below code I get is: "It's not a Hudi table". If I change line: spark_df_commits = spark.sql("call show_commits(adpdb.hudi_stage_jira, 5)") to: spark_df_commits = spark.sql("call show_commits('adpdb.hudi_stage_jira', 5)") Then I get different error: ParseException: == SQL == call show_commits('adpdb.hudi_stage_jira', 5) ^^^

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import concat_ws, col, split, size, lit
from pyspark.sql.types import StringType, BooleanType, DateType
import pyspark.sql.functions as F

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
spark = (SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.hive.convertMetastoreParquet','false') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
    .config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Execute the query and show the results
spark_df_commits = spark.sql("call show_commits(adpdb.hudi_stage_jira, 5)")
spark_df_commits.show()
job.commit()
ad1happy2go commented 11 months ago

@luky777 I also got this error when I tried. Were you able to get a chance to check this with AWS support?

luky777 commented 11 months ago

@ad1happy2go Issue with my setup was this config when saving to the Hudi table: "hoodie.datasource.hive_sync.sync_as_datasource": "false" Once I removed this line it worked. Here are my hoodie settings:

"className": "org.apache.hudi",

"hoodie.table.name": hudi_table_name,

"path": s3_path_hudi,

"hoodie.datasource.write.table.type": "MERGE_ON_READ",

"hoodie.datasource.write.operation": write_operation_type,

"hoodie.datasource.write.precombine.field": "timestamp",

"hoodie.datasource.write.recordkey.field": id_column,

"hoodie.datasource.write.hive_style_partitioning": "true",

"hoodie.datasource.hive_sync.enable": "true",

"hoodie.datasource.hive_sync.database": database_name,

"hoodie.datasource.hive_sync.table": hudi_table_name,

"hoodie.datasource.hive_sync.use_jdbc": "false",

"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",

"hoodie.compact.inline": "false",

"hoodie.compact.inline.max.delta.commits": 1,

"hoodie.compact.schedule.inline": "true",

"hoodie.parquet.small.file.limit": 104857600,

"hoodie.parquet.max.file.size": 125829120,

"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS" ,

"hoodie.cleaner.fileversions.retained": 1 ,

"hoodie.cleaner.parallelism": 200,

"hoodie.upsert.shuffle.parallelism": 200,

"hoodie.datasource.write.payload.class" : "org.apache.hudi.payload.AWSDmsAvroPayload",

"hoodie.datasource.write.transformer.class" :"org.apache.hudi.utilities.transform.AWSDmsTransformer",