apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[Bug] Hudi AWS Connector Throws Error on Hive Sync with Glue #7879

Closed soumilshah1995 closed 1 year ago

soumilshah1995 commented 1 year ago

Hello We were using AWS Market place connector and this morning i was preparing some hudi labs thats when this error started to show up

Code

try:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime
    import boto3
    from functools import reduce
    import uuid
    from faker import Faker
except Exception as e:
    pass

spark = SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true') \
    .getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()
db_name = "hudidb"
table_name = "sample"

recordkey = 'emp_id'
path = "s3://soumilshah-hudi-demos/tmp/"
groupSize = "1048576"
method = 'upsert'
table_type = "COPY_ON_WRITE"

connection_options = {
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,

    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode": "hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
}

global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex')
            ) for x in range(20)
        ]

data = DataGenerator.get_data()

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext, "glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)
job.commit()

Error Message

Py4JJavaError: An error occurred while calling o111.pyWriteDynamicFrame.
: org.apache.hudi.hive.HoodieHiveSyncException: Got runtime exception when hive syncing
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:83)
    at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:539)
    at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:595)
    at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:591)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:77)
    at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:591)
    at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:665)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
    at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:45)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:71)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to create HiveMetaStoreClient
    at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:92)
    at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:78)
    ... 48 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 02e6bfa7-f5c0-4f18-b223-112bb28bf480; Proxy: null))
    at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:239)
    at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:402)
    at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:335)
    at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:315)
    at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:291)
    at org.apache.hudi.hive.ddl.HMSDDLExecutor.<init>(HMSDDLExecutor.java:68)
    at org.apache.hudi.hive.HoodieHiveClient.<init>(HoodieHiveClient.java:76)
    ... 49 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 02e6bfa7-f5c0-4f18-b223-112bb28bf480; Proxy: null))
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3991)
    at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:251)
    at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:234)
    ... 55 more
Caused by: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: 02e6bfa7-f5c0-4f18-b223-112bb28bf480; Proxy: null))
    at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.doesDefaultDBExist(AWSCatalogMetastoreClient.java:244)
    at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.<init>(AWSCatalogMetastoreClient.java:152)
    at com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory.createMetaStoreClient(AWSGlueDataCatalogHiveClientFactory.java:20)
    at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:507)
    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3746)
    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3726)
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3988)
    ... 57 more

Connector Version

image

Note : i have tried this labs before and it was all fine until this morning when it started to throw hive sync error

soumilshah1995 commented 1 year ago

closing issue as the issue was with lake formation

juanAmayaRamirez commented 1 year ago

Hi @soumilshah1995 just here to ask what the issue was. I am having a similar issue with lake formation that I can't get to figure out when trying to read a Hudi table from Data catalog. Can this be related? If not, do you have any suggestions?

Glue config: Glue 4.0 Job Parameters: --datalake-formats hudi --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false

Code:

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
    .config('spark.sql.hive.convertMetastoreParquet', 'false')\
    .config("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")\
    .config("spark.sql.avro.datetimeRebaseModeInWrite", "CORRECTED")\
    .getOrCreate()

glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

dataFrame = glueContext.create_dynamic_frame_from_catalog(
    database = "my_db",
    table_name = "my_table"
)

Error: 2023-05-03 21:18:58,045 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last): File "/tmp/read hudi without connector.py", line 37, in dataFrame = glueContext.create_dynamic_frame_from_catalog( File "/opt/amazon/lib/python3.7/site-packages/awsglue/context.py", line 188, in create_dynamic_frame_from_catalog return source.getFrame(*kwargs) File "/opt/amazon/lib/python3.7/site-packages/awsglue/data_source.py", line 37, in getFrame jframe = self._jsource.getDynamicFrame() File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in call return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco return f(a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o108.getDynamicFrame. : java.lang.UnsupportedOperationException: Reads and writes using Lake Formation permissions are not supported for hudi tables. at com.amazonaws.services.glue.GlueUtility$.checkDataLakeFormatAndLakeFormation(GlueUtility.scala:54) at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:760) at com.amazonaws.services.glue.DataSource.getDynamicFrame(DataSource.scala:102) at com.amazonaws.services.glue.DataSource.getDynamicFrame$(DataSource.scala:102) at com.amazonaws.services.glue.AbstractSparkSQLDataSource.getDynamicFrame(DataSource.scala:726) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) image

soumilshah1995 commented 1 year ago

Hey Buddy @juanAmayaRamirez

just use glue 4.0 and pass these param it will be fixed

"""
--additional-python-modules  | faker==11.3.0
--conf  |  spark.serializer=org.apache.spark.serializer.KryoSerializer  --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--datalake-formats | hudi
"""
juanAmayaRamirez commented 1 year ago

Thanks for the quick response! (love your videos BTW) but sorry to tell that I am getting the same error.

An error occurred while calling o110.getDynamicFrame. Reads and writes using Lake Formation permissions are not supported for hudi tables.

I was able to read the table using spark directly like: dataFrame = spark.read.format("hudi").load("s3://bucket/path/to/my_table/")

BUT NOT with glueContext using a table already in the glue datacatalog.

dataFrame = glueContext.create_dynamic_frame_from_catalog(
    database = "my_db",
    table_name = "my_table"
)

According to AWS docs: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-hudi.html both should work fine.

soumilshah1995 commented 1 year ago

@juanAmayaRamirez

lets hop on call here is link

https://meet.google.com/gam-wsca-hxi