apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.42k forks source link

[SUPPORT] How to use hudi-defaults.conf with Glue #5291

Closed MoustafaAMahmoud closed 2 years ago

MoustafaAMahmoud commented 2 years ago

Describe the problem you faced

I tried to use Hudi hudi-defaults.conf with Glue and tried to set the path of the file using Spark Config and Python Environment config and it doesn't work. I checked this issue https://github.com/apache/hudi/pull/4167 but i can't find a clear idea about how to use it.

Spark Config: pyspark

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')
.config('spark.sql.hive.convertMetastoreParquet','false')
.config('spark.yarn.appMasterEnv.HUDI_CONF_DIR',args['HUDI_CONF_DIR'])
.config('spark.executorEnv.HUDI_CONF_DIR',args['HUDI_CONF_DIR']).getOrCreate()

Env Config:

HUDI_CONF_DIR='s3://glue-development-bucket/scripts/hudi-conf/hudi-default.conf'
os.environ['HUDI_CONF_DIR'] = args['HUDI_CONF_DIR']

I am getting the same error every time, I am not sure if there is a clear example about how to use this feature with spark or Glue.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Add the stacktrace of the error.

2022-04-19 00:34:37,012 WARN [Thread-10] config.DFSPropertiesConfiguration (DFSPropertiesConfiguration.java:getConfPathFromEnv(188)): Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
--
2022-04-19 00:34:37,085 WARN [Thread-10] config.DFSPropertiesConfiguration (DFSPropertiesConfiguration.java:addPropsFromFile(131)): Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file

In the test example, it uses DFSPropertiesConfiguration.refreshGlobalProps(); to refresh, but i am not sure how to use this with pyspark config.

nsivabalan commented 2 years ago

@zhedoubushishi : can you chime in here please.

zhedoubushishi commented 2 years ago

Sorry I don't fully understand your question. This should work: os.environ['HUDI_CONF_DIR'] = args['HUDI_CONF_DIR']. But what args['HUDI_CONF_DIR'] stands for?

MoustafaAMahmoud commented 2 years ago

@zhedoubushishi I updated the issue. args['HUDI_CONF_DIR'] is S3 URI to the hudi config. For example,

HUDI_CONF_DIR='s3://glue-development-bucket/scripts/hudi-conf/hudi-default.conf'

I am testing with Glue3 + Hudi 0.10.1 + S3

This file contains the below details

# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.

# Example:
hoodie.datasource.write.table.type                      COPY_ON_WRITE
hoodie.datasource.write.hive_style_partitioning         false

# commonConfig      
className                                               org.apache.hudi
hoodie.datasource.hive_sync.use_jdbc                    false
hoodie.datasource.write.precombine.field                tpep_pickup_datetime
hoodie.datasource.write.recordkey.field                 pk_col
hoodie.table.name                                       ny_yellow_trip_data_paritioned
hoodie.consistency.check.enabled                        true
hoodie.datasource.hive_sync.database                    hudi10
hoodie.datasource.hive_sync.table                       ny_yellow_trip_data_paritioned
hoodie.datasource.hive_sync.enable                      true
hoodie.metrics.on                                       true
hoodie.metrics.reporter.type                            CLOUDWATCH
path                                                    s3://hudi-update/hudi10/ny_yellow_trip_data_partitioned

#partitionDataConfig    
hoodie.datasource.write.partitionpath.field             payment_type
hoodie.datasource.hive_sync.partition_extractor_class   org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields            payment_type
hoodie.datasource.write.hive_style_partitioning         true

# incrementalConfig
hoodie.datasource.write.operation                       upsert
hoodie.cleaner.policy                                   KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained                         1     
zhedoubushishi commented 2 years ago

First thing is it should be something like HUDI_CONF_DIR='s3://glue-development-bucket/scripts/hudi-conf/. This is just the directory path. And the config file name should be hudi-defaults.conf rather than hudi-default.conf since we hard code the config file name.

MoustafaAMahmoud commented 2 years ago

The warning disappeared, I will verify it is working fine and share the output. It still there. I added the details in the next comment

MoustafaAMahmoud commented 2 years ago

@zhedoubushishi it seems not working


import sys
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when,lit
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import *
from datetime import datetime
import os
args = getResolvedOptions(sys.argv, ['JOB_NAME','curated_bucket'])
os.environ['HUDI_CONF_DIR'] = 's3://glue-development-bucket/scripts/hudi-conf/'

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer')\
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

db_name = 'hudi_sql_dml_ddl_hive'
bucket_name = args['curated_bucket']
bucket_prefix = "data"
table_prefix = f"{bucket_prefix}/{db_name}"
table_base_location=f"s3://{bucket_name}/{table_prefix}"
table_name='hudi_tbl_partitioned'
s3_output_path = f"{table_base_location}/{table_name}"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name};")
spark.sql(f"use  {db_name};")
spark.sql("show tables;").show()

query_create_tbl = f"""
create table if not exists {db_name}.{table_name} (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
)   using hudi
options (
  hoodie.table.name = '{table_name}',
  path = '{s3_output_path}',
  hoodie.datasource.hive_sync.table = '{table_name}'
)
partitioned by (dt, hh);
"""

spark.sql(query_create_tbl)

print(f"""SHOW CREATE TABLE {table_name};""")
spark.sql(f"SHOW CREATE TABLE {table_name};").show(truncate=False)

print(f"""show partitions {table_name};""")
spark.sql(f"""show partitions {table_name};""").show(truncate=False)

#insert dynamic partition
print(f"""insert dynamic partition""")
print(f"""insert into {db_name}.{table_name} partition (dt, hh) select 1 as product_id, 'a1' as product_name, 1000 as ts, '2021-12-09' as dt, '10' as hh;""")
spark.sql(f"""insert into {db_name}.{table_name} partition (dt, hh) select 1 as product_id, 'a1' as product_name, 1000 as ts, '2021-12-09' as dt, '10' as hh;""")
spark.sql(f"select * from {db_name}.{table_name};").show(truncate=False)

print("insert static partition")
spark.sql(f"""insert into {db_name}.{table_name} partition(dt = '2021-12-09', hh='11') select 2, 'a2', 2000;""")
spark.sql(f"""insert into {db_name}.{table_name} partition(dt = '2021-12-10', hh='11') select 3, 'a3', 3000;""")
spark.sql(f"""insert into {db_name}.{table_name} partition(dt = '2021-12-10', hh='10') select 4, 'a4', 4000;""")
spark.sql(f"select * from {db_name}.{table_name};").show(truncate=False)

#show partition:
print(f"""show partitions {table_name};""")
spark.sql(f"""show partitions {table_name};""").show(truncate=False)

#drop partition:
print(f"""alter table {table_name} drop partition (dt='2021-12-09', hh='10');""")
spark.sql(f"""alter table {table_name} drop partition (dt='2021-12-09', hh='10');""")

print(f"""show partitions {table_name}; after alter""")
#show partition:
spark.sql(f"""show partitions {table_name};""").show(truncate=False)

spark.sql(f"select * from {db_name}.{table_name};").show(truncate=False)

hudi-defaults.conf

# Default system properties included when running Hudi jobs.
# This is useful for setting default environmental settings.

# Example:
hoodie.datasource.write.table.type                      MERGE_ON_READ

# commonConfig
className                                               org.apache.hudi
hoodie.datasource.hive_sync.use_jdbc                    false
hoodie.datasource.write.precombine.field                ts
hoodie.datasource.write.recordkey.field                 id
hoodie.consistency.check.enabled                        true
hoodie.datasource.hive_sync.database                    hudi_sql_dml_ddl_hive
hoodie.datasource.hive_sync.enable                      true
hoodie.metrics.on                                       true
hoodie.metrics.reporter.type                            CLOUDWATCH

#partitionDataConfig
hoodie.datasource.hive_sync.partition_extractor_class   org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.write.hive_style_partitioning         true
hoodie.datasource.write.partitionpath.urlencode         true
hoodie.datasource.hive_sync.partition_fields            dt,hh
hoodie.datasource.write.partitionpath.field             dt,hh

# incrementalConfig
hoodie.datasource.write.operation                       upsert
hoodie.cleaner.policy                                   KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained                         1

hoodie.insert.shuffle.parallelism                       1
hoodie.upsert.shuffle.parallelism                       1
hoodie.update.shuffle.parallelism                       1

Full logs

<html>
<head> Glue logs </head>
<body>
awsglue-todworkers-iad-prod-2d-0a589c0e.us-east-1.amazon.com Tue Mar 29 16:05:52 UTC 2022 gluetod
Preparing ...
Wed Apr 20 00:27:20 UTC 2022
/usr/bin/java -cp /tmp:/opt/amazon/conf:/opt/amazon/glue-manifest.jar com.amazonaws.services.glue.PrepareLaunch --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=4 --conf spark.executor.memory=10g --conf spark.executor.cores=8 --conf spark.driver.memory=10g --conf spark.default.parallelism=40 --conf spark.sql.shuffle.partitions=40 --conf spark.network.timeout=600   --enable-glue-datacatalog true --job-bookmark-option job-bookmark-enable --TempDir s3://glue-development-bucket/temp/ --extra-jars s3://glue-development-bucket/jars/glue3/hudi-utilities-bundle_2.12-0.10.1.jar,s3://glue-development-bucket/jars/glue3/hudi-spark3.1.1-bundle_2.12-0.10.1.jar,s3://glue-development-bucket/jars/glue3/calcite-core-1.10.0.jar,s3://glue-development-bucket/jars/glue3/datanucleus-core-4.1.17.jar  --class GlueApp --JOB_ID j_b5ac6a0ebf4aaec1f512936c33608ce4cdd7601ac725d5e67710eaefe1581c46 --curated_bucket hudi-upgrade --enable-metrics true --spark-event-logs-path s3://glue-development-bucket/logs/  --JOB_RUN_ID jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df --enable-continuous-cloudwatch-log true --scriptLocation s3://glue-development-bucket/scripts/hudi-spark-sql-test.py  --job-language python --JOB_NAME hudi-spark-sql-test
1650414449874
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/amazon/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/amazon/lib/Log4j-slf4j-2.x.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/amazon/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
INFO    2022-04-20 00:27:31,785 0   com.amazonaws.services.glue.PrepareLaunch   [main]  glue.etl.telemetry.runtimeImproveFeature.jobInsights30, jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df
Glue analyzer rules s3://aws-glue-jes-prod-us-east-1-assets/etl/analysis-rules/glue3/
GlueTelemetry: Current region us-east-1
GlueTelemetry: Glue Endpoint https://glue.us-east-1.amazonaws.com
GlueTelemetry: Prod env...false
GlueTelemetry: is disabled
Glue ETL Marketplace - Start ETL connector activation process...
Glue ETL Marketplace - no connections are attached to job hudi-spark-sql-test, no need to download custom connector jar for it.
Glue ETL Marketplace - Retrieved no ETL connector jars, this may be due to no marketplace/custom connection attached to the job or failure of downloading them, please scroll back to the previous logs to find out the root cause. Container setup continues.
Glue ETL Marketplace - ETL connector activation process finished, container setup continues...
Launching ...
Wed Apr 20 00:27:36 UTC 2022
/usr/bin/java -cp /tmp:/opt/amazon/conf:/opt/amazon/glue-manifest.jar:/tmp/* -Dlog4j.configuration=log4j -server -Xmx10g -XX:+UseG1GC -XX:MaxHeapFreeRatio=70 -XX:InitiatingHeapOccupancyPercent=45 -XX:OnOutOfMemoryError=kill -9 %p -XX:+UseCompressedOops -Djavax.net.ssl.trustStore=/opt/amazon/certs/ExternalAndAWSTrustStore.jks -Djavax.net.ssl.trustStoreType=JKS -Djavax.net.ssl.trustStorePassword=amazon -DRDS_ROOT_CERT_PATH=/opt/amazon/certs/rds-combined-ca-bundle.pem -DREDSHIFT_ROOT_CERT_PATH=/opt/amazon/certs/redshift-ssl-ca-cert.pem -DRDS_TRUSTSTORE_URL=file:/opt/amazon/certs/RDSTrustStore.jks -Dspark.network.timeout=600 -Dspark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource -Dspark.executor.instances=4 -Dspark.driver.cores=4 -Dspark.metrics.conf.*.source.system.class=org.apache.spark.metrics.source.SystemMetricsSource -Dspark.sql.parquet.output.committer.class=com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter -Dspark.script.location=s3://glue-development-bucket/scripts/hudi-spark-sql-test.py -Dspark.sql.shuffle.partitions=16 -Dspark.metrics.conf.*.sink.GlueCloudwatch.jobRunId=jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df -Dspark.hadoop.aws.glue.endpoint=https://glue.us-east-1.amazonaws.com -Dspark.glueExceptionAnalysisEventLog.dir=/tmp/glue-exception-analysis-logs/ -Dspark.hadoop.mapred.output.direct.EmrFileSystem=true -Dspark.glueJobInsights.enabled=true -Dspark.glue.JOB_NAME=hudi-spark-sql-test -Dspark.glue.USE_PROXY=false -Dspark.eventLog.dir=/tmp/spark-event-logs/ -Dspark.extraListeners=com.amazonaws.services.glueexceptionanalysis.GlueExceptionAnalysisListener -Dspark.hadoop.fs.s3.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem -Dspark.hadoop.mapred.output.direct.NativeS3FileSystem=true -Dspark.pyspark.python=/usr/bin/python3 -Dspark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory -Dspark.authenticate.secret=<HIDDEN> -Dspark.glue.extra-jars=s3://glue-development-bucket/jars/glue3/hudi-utilities-bundle_2.12-0.10.1.jar,s3://glue-development-bucket/jars/glue3/hudi-spark3.1.1-bundle_2.12-0.10.1.jar,s3://glue-development-bucket/jars/glue3/calcite-core-1.10.0.jar,s3://glue-development-bucket/jars/glue3/datanucleus-core-4.1.17.jar -Dspark.cloudwatch.logging.ui.showConsoleProgress=true -Dspark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 -Dspark.hadoop.glue.michiganCredentialsProviderProxy=com.amazonaws.services.glue.remote.LakeformationCredentialsProvider -Dspark.metrics.conf.driver.source.aggregate.class=org.apache.spark.metrics.source.AggregateMetricsSource -Dspark.glue.JOB_RUN_ID=jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df -Dspark.glue.enable-continuous-cloudwatch-log=true -Dspark.glue.endpoint=https://glue-jes-prod.us-east-1.amazonaws.com -Dspark.sql.catalogImplementation=hive -Dspark.default.parallelism=16 -Dspark.ui.enabled=false -Dspark.driver.extraClassPath=/tmp:/opt/amazon/conf:/opt/amazon/glue-manifest.jar -Dspark.glue.GLUE_TASK_GROUP_ID=b8ef3e1f-06f7-40c2-b9f8-5a1756468c6c -Dspark.authenticate=true -Dspark.dynamicAllocation.enabled=false -Dspark.dynamicAllocation.maxExecutors=4 -Dspark.shuffle.service.enabled=false -Dspark.hadoop.mapred.output.committer.class=org.apache.hadoop.mapred.DirectOutputCommitter -Dspark.executor.extraClassPath=/tmp:/opt/amazon/conf:/opt/amazon/glue-manifest.jar -Dspark.glue.GLUE_VERSION=3.0 -Dspark.executor.cores=4 -Dspark.hadoop.lakeformation.credentials.url=http://localhost:9998/lakeformationcredentials -Dspark.driver.host=172.35.173.163 -Dspark.app.name=nativespark-hudi-spark-sql-test-jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df -Dspark.sql.parquet.fs.optimized.committer.optimization-enabled=true -Dspark.rpc.askTimeout=600 -Dspark.metrics.conf.*.sink.GlueCloudwatch.jobName=hudi-spark-sql-test -Dspark.metrics.conf.*.source.s3.class=org.apache.spark.metrics.source.S3FileSystemSource -Dspark.metrics.conf.*.sink.GlueCloudwatch.namespace=Glue -Dspark.executor.memory=10g -Dspark.metrics.conf.*.sink.GlueCloudwatch.class=org.apache.spark.metrics.sink.GlueCloudwatchSink -Dspark.driver.memory=10g -Dspark.hadoop.fs.s3.buffer.dir=/tmp/hadoop-spark/s3 -Dspark.glue.GLUE_COMMAND_CRITERIA=glueetl -Dspark.master=jes -Dspark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs=false -Dspark.unsafe.sorter.spill.read.ahead.enabled=false -Dspark.hadoop.parquet.enable.summary-metadata=false -Dspark.glueAppInsightsLog.dir=/tmp/glue-app-insights-logs/ -Dspark.glue.jobLanguage=python -Dspark.files.overwrite=true -Dspark.cloudwatch.logging.conf.jobRunId=jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df com.amazonaws.services.glue.ProcessLauncher --launch-class org.apache.spark.deploy.PythonRunner /opt/amazon/bin/runscript.py  /tmp/hudi-spark-sql-test.py true --job-bookmark-option job-bookmark-enable --TempDir s3://glue-development-bucket/temp/ --JOB_ID j_b5ac6a0ebf4aaec1f512936c33608ce4cdd7601ac725d5e67710eaefe1581c46 --curated_bucket hudi-upgrade true --spark-event-logs-path s3://glue-development-bucket/logs/ --JOB_RUN_ID jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df --JOB_NAME hudi-spark-sql-test
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/amazon/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/amazon/lib/Log4j-slf4j-2.x.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/amazon/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
Continuous Logging: Creating cloudwatch appender.
Continuous Logging: Creating cloudwatch appender.
Continuous Logging: Creating cloudwatch appender.
Continuous Logging: Creating cloudwatch appender.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

SLF4J: A number (14) of logging calls during the initialization phase have been intercepted and are

SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.

SLF4J: See also http://www.slf4j.org/codes.html#replay

2022-04-20 00:27:40,055 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): SLF4J: Class path contains multiple SLF4J bindings.
2022-04-20 00:27:40,058 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): SLF4J: Found binding in [jar:file:/opt/amazon/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
2022-04-20 00:27:40,059 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): SLF4J: Found binding in [jar:file:/opt/amazon/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
2022-04-20 00:27:40,059 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): SLF4J: Found binding in [jar:file:/opt/amazon/lib/Log4j-slf4j-2.x.jar!/org/slf4j/impl/StaticLoggerBinder.class]
2022-04-20 00:27:40,059 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
2022-04-20 00:27:40,076 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-04-20 00:27:40,459 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
2022-04-20 00:27:40,460 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): log4j:WARN Please initialize the log4j system properly.
2022-04-20 00:27:40,460 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2022-04-20 00:27:41,440 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): ==== Netty Server Started ====
2022-04-20 00:27:41,443 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): input rulesFilePath=/tmp/glue_app_analyzer_rules
2022-04-20 00:27:41,565 INFO [main] glue.SafeLogging (Logging.scala:logInfo(57)): Initializing logging subsystem
2022-04-20 00:27:43,949 INFO [Thread-10] spark.SparkContext (Logging.scala:logInfo(57)): Running Spark version 3.1.1-amzn-0
2022-04-20 00:27:44,038 INFO [Thread-10] resource.ResourceUtils (Logging.scala:logInfo(57)): ==============================================================
2022-04-20 00:27:44,040 INFO [Thread-10] resource.ResourceUtils (Logging.scala:logInfo(57)): No custom resources configured for spark.driver.
2022-04-20 00:27:44,041 INFO [Thread-10] resource.ResourceUtils (Logging.scala:logInfo(57)): ==============================================================
2022-04-20 00:27:44,043 INFO [Thread-10] spark.SparkContext (Logging.scala:logInfo(57)): Submitted application: nativespark-hudi-spark-sql-test-jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df
2022-04-20 00:27:44,076 INFO [Thread-10] resource.ResourceProfile (Logging.scala:logInfo(57)): Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 10240, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
2022-04-20 00:27:44,101 INFO [Thread-10] resource.ResourceProfile (Logging.scala:logInfo(57)): Limiting resource is cpus at 4 tasks per executor
2022-04-20 00:27:44,107 INFO [Thread-10] resource.ResourceProfileManager (Logging.scala:logInfo(57)): Added ResourceProfile id: 0
2022-04-20 00:27:44,212 INFO [Thread-10] spark.SecurityManager (Logging.scala:logInfo(57)): Changing view acls to: spark
2022-04-20 00:27:44,212 INFO [Thread-10] spark.SecurityManager (Logging.scala:logInfo(57)): Changing modify acls to: spark
2022-04-20 00:27:44,213 INFO [Thread-10] spark.SecurityManager (Logging.scala:logInfo(57)): Changing view acls groups to: 
2022-04-20 00:27:44,214 INFO [Thread-10] spark.SecurityManager (Logging.scala:logInfo(57)): Changing modify acls groups to: 
2022-04-20 00:27:44,214 INFO [Thread-10] spark.SecurityManager (Logging.scala:logInfo(57)): SecurityManager: authentication enabled; ui acls disabled; users  with view permissions: Set(spark); groups with view permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()
2022-04-20 00:27:44,605 INFO [Thread-10] util.Utils (Logging.scala:logInfo(57)): Successfully started service 'sparkDriver' on port 32837.
2022-04-20 00:27:44,672 INFO [Thread-10] spark.SparkEnv (Logging.scala:logInfo(57)): Registering MapOutputTracker
2022-04-20 00:27:44,727 INFO [Thread-10] spark.SparkEnv (Logging.scala:logInfo(57)): Registering BlockManagerMaster
2022-04-20 00:27:44,769 INFO [Thread-10] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(57)): Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2022-04-20 00:27:44,770 INFO [Thread-10] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(57)): BlockManagerMasterEndpoint up
2022-04-20 00:27:44,792 INFO [Thread-10] spark.SparkEnv (Logging.scala:logInfo(57)): Registering BlockManagerMasterHeartbeat
2022-04-20 00:27:44,864 INFO [Thread-10] storage.DiskBlockManager (Logging.scala:logInfo(57)): Created local directory at /tmp/blockmgr-1c1f7a46-9b63-4617-938e-45f9b509cc03
2022-04-20 00:27:44,921 INFO [Thread-10] memory.MemoryStore (Logging.scala:logInfo(57)): MemoryStore started with capacity 5.8 GiB
2022-04-20 00:27:44,956 INFO [Thread-10] spark.SparkEnv (Logging.scala:logInfo(57)): Registering OutputCommitCoordinator
2022-04-20 00:27:45,131 INFO [Thread-10] scheduler.JESSchedulerBackend$JESAsSchedulerBackendEndpoint (Logging.scala:logInfo(57)): JESAsSchedulerBackendEndpoint
2022-04-20 00:27:45,133 INFO [Thread-10] scheduler.JESSchedulerBackend (Logging.scala:logInfo(57)): JESSchedulerBackend
2022-04-20 00:27:45,456 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): creating executor task for executor 1
2022-04-20 00:27:45,492 INFO [Thread-10] util.Utils (Logging.scala:logInfo(57)): Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 39319.
2022-04-20 00:27:45,493 INFO [Thread-10] netty.NettyBlockTransferService (NettyBlockTransferService.scala:init(81)): Server created on 172.35.173.163:39319
2022-04-20 00:27:45,497 INFO [Thread-10] storage.BlockManager (Logging.scala:logInfo(57)): Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2022-04-20 00:27:45,520 INFO [Thread-10] storage.BlockManagerMaster (Logging.scala:logInfo(57)): Registering BlockManager BlockManagerId(driver, 172.35.173.163, 39319, None)
2022-04-20 00:27:45,524 INFO [dispatcher-BlockManagerMaster] storage.BlockManagerMasterEndpoint (Logging.scala:logInfo(57)): Registering block manager 172.35.173.163:39319 with 5.8 GiB RAM, BlockManagerId(driver, 172.35.173.163, 39319, None)
2022-04-20 00:27:45,530 INFO [Thread-10] storage.BlockManagerMaster (Logging.scala:logInfo(57)): Registered BlockManager BlockManagerId(driver, 172.35.173.163, 39319, None)
2022-04-20 00:27:45,531 INFO [Thread-10] storage.BlockManager (Logging.scala:logInfo(57)): Initialized BlockManager: BlockManagerId(driver, 172.35.173.163, 39319, None)
2022-04-20 00:27:45,684 INFO [Thread-10] sink.GlueCloudwatchSink (GlueCloudwatchSink.scala:logInfo(22)): CloudwatchSink: Obtained credentials from the Instance Profile
2022-04-20 00:27:45,753 INFO [Thread-10] sink.GlueCloudwatchSink (GlueCloudwatchSink.scala:logInfo(22)): CloudwatchSink: jobName: hudi-spark-sql-test jobRunId: jr_d23ecbb5f94c3aa3926907fde84f202e5830a8d7676ade74c2cc7bf3522642df
2022-04-20 00:27:45,952 INFO [Thread-10] util.log (Log.java:initialized(169)): Logging initialized @9785ms to org.sparkproject.jetty.util.log.Slf4jLog
2022-04-20 00:27:46,170 INFO [Thread-10] glueexceptionanalysis.EventLogFileWriter (FileWriter.scala:start(51)): Started file writer for com.amazonaws.services.glueexceptionanalysis.EventLogFileWriter
2022-04-20 00:27:46,200 INFO [Thread-10] glueexceptionanalysis.EventLogSocketWriter (SocketWriter.scala:start(34)): Socket client for com.amazonaws.services.glueexceptionanalysis.EventLogSocketWriter started correctly
2022-04-20 00:27:46,222 INFO [Thread-10] spark.SparkContext (Logging.scala:logInfo(57)): Registered listener com.amazonaws.services.glueexceptionanalysis.GlueExceptionAnalysisListener
2022-04-20 00:27:46,322 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): createChildTask API response code 200
2022-04-20 00:27:46,331 INFO [allocator] glue.ExecutorTaskManagement (Logging.scala:logInfo(57)): executor task g-1b2e819c3f4b83c04b4894f402137ccdb61cf885 created for executor 1
2022-04-20 00:27:46,336 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): creating executor task for executor 2
2022-04-20 00:27:46,333 INFO [Thread-10] scheduler.JESSchedulerBackend (Logging.scala:logInfo(57)): SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
2022-04-20 00:27:46,602 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): createChildTask API response code 200
2022-04-20 00:27:46,602 INFO [allocator] glue.ExecutorTaskManagement (Logging.scala:logInfo(57)): executor task g-35e4f8ad9be7146bb0a17daa6bbf5f0aaeb24ad5 created for executor 2
2022-04-20 00:27:46,603 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): creating executor task for executor 3
2022-04-20 00:27:46,900 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): createChildTask API response code 200
2022-04-20 00:27:46,901 INFO [allocator] glue.ExecutorTaskManagement (Logging.scala:logInfo(57)): executor task g-205779aebb30eba354306f8d2a7c98052d7de372 created for executor 3
2022-04-20 00:27:46,902 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): creating executor task for executor 4
2022-04-20 00:27:47,080 INFO [Thread-10] internal.SharedState (Logging.scala:logInfo(57)): Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/tmp/spark-warehouse').
2022-04-20 00:27:47,081 INFO [Thread-10] internal.SharedState (Logging.scala:logInfo(57)): Warehouse path is 'file:/tmp/spark-warehouse'.
2022-04-20 00:27:47,252 INFO [allocator] glue.TaskGroupInterface (Logging.scala:logInfo(57)): createChildTask API response code 200
2022-04-20 00:27:47,253 INFO [allocator] glue.ExecutorTaskManagement (Logging.scala:logInfo(57)): executor task g-a4c51ec9fbbe46ea1cbf28d272842588bf0c9e00 created for executor 4
2022-04-20 00:27:48,338 INFO [Thread-10] glue.GlueContext (GlueContext.scala:<init>(117)): GlueMetrics configured and enabled
2022-04-20 00:27:49,312 INFO [Thread-10] util.PlatformInfo (PlatformInfo.java:getJobFlowId(56)): Unable to read clusterId from http://localhost:8321/configuration, trying extra instance data file: /var/lib/instance-controller/extraInstanceData.json
2022-04-20 00:27:49,316 INFO [Thread-10] util.PlatformInfo (PlatformInfo.java:getJobFlowId(63)): Unable to read clusterId from /var/lib/instance-controller/extraInstanceData.json, trying EMR job-flow data file: /var/lib/info/job-flow.json
2022-04-20 00:27:49,317 INFO [Thread-10] util.PlatformInfo (PlatformInfo.java:getJobFlowId(71)): Unable to read clusterId from /var/lib/info/job-flow.json, out of places to look
2022-04-20 00:27:50,232 INFO [Thread-10] util.AvroReaderUtil$ (AvroReaderUtil.scala:getFieldParser(245)): Creating default Avro field parser for version 1.7.
2022-04-20 00:27:50,448 INFO [Thread-10] memory.MemoryStore (Logging.scala:logInfo(57)): Block broadcast_0 stored as values in memory (estimated size 311.9 KiB, free 5.8 GiB)
2022-04-20 00:27:51,456 INFO [Thread-10] memory.MemoryStore (Logging.scala:logInfo(57)): Block broadcast_0_piece0 stored as bytes in memory (estimated size 105.0 B, free 5.8 GiB)
2022-04-20 00:27:51,459 INFO [dispatcher-BlockManagerMaster] storage.BlockManagerInfo (Logging.scala:logInfo(57)): Added broadcast_0_piece0 in memory on 172.35.173.163:39319 (size: 105.0 B, free: 5.8 GiB)
2022-04-20 00:27:51,463 INFO [Thread-10] spark.SparkContext (Logging.scala:logInfo(57)): Created broadcast 0 from broadcast at DynamoConnection.scala:53
2022-04-20 00:27:51,481 INFO [Thread-10] memory.MemoryStore (Logging.scala:logInfo(57)): Block broadcast_1 stored as values in memory (estimated size 311.9 KiB, free 5.8 GiB)
2022-04-20 00:27:51,486 INFO [Thread-10] memory.MemoryStore (Logging.scala:logInfo(57)): Block broadcast_1_piece0 stored as bytes in memory (estimated size 105.0 B, free 5.8 GiB)
2022-04-20 00:27:51,487 INFO [dispatcher-BlockManagerMaster] storage.BlockManagerInfo (Logging.scala:logInfo(57)): Added broadcast_1_piece0 in memory on 172.35.173.163:39319 (size: 105.0 B, free: 5.8 GiB)
2022-04-20 00:27:51,490 INFO [Thread-10] spark.SparkContext (Logging.scala:logInfo(57)): Created broadcast 1 from broadcast at DynamoConnection.scala:53
ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8
2022-04-20 00:27:51,681 INFO [dispatcher-BlockManagerMaster] storage.BlockManagerInfo (Logging.scala:logInfo(57)): Removed broadcast_0_piece0 on 172.35.173.163:39319 in memory (size: 105.0 B, free: 5.8 GiB)
2022-04-20 00:27:51,701 INFO [dispatcher-BlockManagerMaster] storage.BlockManagerInfo (Logging.scala:logInfo(57)): Removed broadcast_1_piece0 on 172.35.173.163:39319 in memory (size: 105.0 B, free: 5.8 GiB)
ANTLR Tool version 4.7 used for code generation does not match the current runtime version 4.8
2022-04-20 00:27:54,655 INFO [Thread-10] conf.HiveConf (HiveConf.java:findConfigFile(181)): Found configuration file null
2022-04-20 00:27:54,683 INFO [Thread-10] hive.HiveUtils (Logging.scala:logInfo(57)): Initializing HiveMetastoreConnection version 2.3.7-amzn-4 using Spark classes.
2022-04-20 00:27:54,976 INFO [Thread-10] conf.HiveConf (HiveConf.java:findConfigFile(181)): Found configuration file null
2022-04-20 00:27:55,867 INFO [Thread-10] session.SessionState (SessionState.java:createPath(753)): Created HDFS directory: /tmp/hive/spark
2022-04-20 00:27:55,868 INFO [Thread-10] session.SessionState (SessionState.java:createPath(753)): Created local directory: /tmp/spark
2022-04-20 00:27:55,869 INFO [Thread-10] session.SessionState (SessionState.java:createPath(753)): Created HDFS directory: /tmp/hive/spark/8eacaff7-b865-43fc-87f4-d41cbae09a3f
2022-04-20 00:27:55,870 INFO [Thread-10] session.SessionState (SessionState.java:createPath(753)): Created local directory: /tmp/spark/8eacaff7-b865-43fc-87f4-d41cbae09a3f
2022-04-20 00:27:55,870 INFO [Thread-10] session.SessionState (SessionState.java:createPath(753)): Created HDFS directory: /tmp/hive/spark/8eacaff7-b865-43fc-87f4-d41cbae09a3f/_tmp_space.db
2022-04-20 00:27:55,884 INFO [Thread-10] client.HiveClientImpl (Logging.scala:logInfo(57)): Warehouse location for Hive client (version 2.3.7) is file:/tmp/spark-warehouse
2022-04-20 00:27:56,852 INFO [Thread-10] metastore.AWSGlueClientFactory (AWSGlueClientFactory.java:newClient(56)): Setting glue service endpoint to https://glue.us-east-1.amazonaws.com
2022-04-20 00:27:58,069 INFO [Thread-10] common.FileUtils (FileUtils.java:mkdir(520)): Creating directory if it doesn't exist: file:/tmp/spark-warehouse/hudi_sql_dml_ddl_hive.db
2022-04-20 00:27:58,383 INFO [Thread-10] Configuration.deprecation (Configuration.java:logDeprecation(1394)): io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2022-04-20 00:27:59,701 INFO [Thread-10] codegen.CodeGenerator (Logging.scala:logInfo(57)): Code generated in 288.195849 ms
2022-04-20 00:27:59,745 INFO [Thread-10] codegen.CodeGenerator (Logging.scala:logInfo(57)): Code generated in 25.989248 ms
2022-04-20 00:28:01,110 INFO [Thread-10] catalog.HoodieCatalogTable (Logging.scala:logInfo(57)): Init hoodie.properties for hudi_sql_dml_ddl_hive.hudi_tbl_partitioned
2022-04-20 00:28:01,115 WARN [Thread-10] config.DFSPropertiesConfiguration (DFSPropertiesConfiguration.java:getConfPathFromEnv(188)): Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
2022-04-20 00:28:01,202 WARN [Thread-10] config.DFSPropertiesConfiguration (DFSPropertiesConfiguration.java:addPropsFromFile(131)): Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
2022-04-20 00:28:01,367 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/hudi-spark-sql-test.py", line 54, in <module>
    spark.sql(query_create_tbl)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Can't find primaryKey `uuid` in root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- dt: string (nullable = true)
 |-- hh: string (nullable = true)
.
2022-04-20 00:28:01,399 ERROR [main] glueexceptionanalysis.GlueExceptionAnalysisListener (Logging.scala:logError(9)): [Glue Exception Analysis] 
{
    "Event": "GlueETLJobExceptionEvent",
    "Timestamp": 1650414481388,
    "Failure Reason": "Traceback (most recent call last):\n  File \"/tmp/hudi-spark-sql-test.py\", line 54, in <module>\n    spark.sql(query_create_tbl)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py\", line 723, in sql\n    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 117, in deco\n    raise converted from None\npyspark.sql.utils.IllegalArgumentException: Can't find primaryKey `uuid` in root\n |-- _hoodie_commit_time: string (nullable = true)\n |-- _hoodie_commit_seqno: string (nullable = true)\n |-- _hoodie_record_key: string (nullable = true)\n |-- _hoodie_partition_path: string (nullable = true)\n |-- _hoodie_file_name: string (nullable = true)\n |-- id: long (nullable = true)\n |-- name: string (nullable = true)\n |-- ts: long (nullable = true)\n |-- dt: string (nullable = true)\n |-- hh: string (nullable = true)\n.",
    "Stack Trace": [
        {
            "Declaring Class": "deco",
            "Method Name": "raise converted from None",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
            "Line Number": 117
        },
        {
            "Declaring Class": "__call__",
            "Method Name": "answer, self.gateway_client, self.target_id, self.name)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
            "Line Number": 1305
        },
        {
            "Declaring Class": "sql",
            "Method Name": "return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/session.py",
            "Line Number": 723
        },
        {
            "Declaring Class": "<module>",
            "Method Name": "spark.sql(query_create_tbl)",
            "File Name": "/tmp/hudi-spark-sql-test.py",
            "Line Number": 54
        }
    ],
    "Last Executed Line number": 54,
    "script": "hudi-spark-sql-test.py"
}

2022-04-20 00:28:01,471 ERROR [main] glueexceptionanalysis.GlueExceptionAnalysisListener (Logging.scala:logError(9)): [Glue Exception Analysis] Last Executed Line number from script hudi-spark-sql-test.py: 54
2022-04-20 00:28:01,477 INFO [main] glue.ResourceManagerSocketWriter (SocketWriter.scala:start(34)): Socket client for com.amazonaws.services.glue.ResourceManagerSocketWriter started correctly
2022-04-20 00:28:01,486 INFO [main] glue.ProcessLauncher (Logging.scala:logInfo(57)): postprocessing
2022-04-20 00:28:01,487 INFO [main] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Analyzer: Error stopping analyzer process null
2022-04-20 00:28:01,487 INFO [main] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Analyzer: Error terminating analyzer process null
2022-04-20 00:28:01,488 INFO [main] glue.LogPusher (Logging.scala:logInfo(57)): stopping
Continuous Logging: Shutting down cloudwatch appender.

Continuous Logging: Shutting down cloudwatch appender.
Continuous Logging: Shutting down cloudwatch appender.
Continuous Logging: Shutting down cloudwatch appender.

2022-04-20 00:28:01,504 INFO [shutdown-hook-0] spark.SparkContext (Logging.scala:logInfo(57)): Invoking stop() from shutdown hook
2022-04-20 00:28:01,517 INFO [spark-listener-group-shared] glueexceptionanalysis.EventLogFileWriter (FileWriter.scala:stop(70)): Logs, events processed and insights are written to file /tmp/glue-exception-analysis-logs/spark-application-1650414465125
2022-04-20 00:28:01,520 INFO [shutdown-hook-0] scheduler.JESSchedulerBackend (Logging.scala:logInfo(57)): Shutting down all executors
2022-04-20 00:28:01,536 INFO [dispatcher-CoarseGrainedScheduler] scheduler.JESSchedulerBackend$JESAsSchedulerBackendEndpoint (Logging.scala:logInfo(57)): Asking each executor to shut down
2022-04-20 00:28:01,555 INFO [dispatcher-event-loop-3] spark.MapOutputTrackerMasterEndpoint (Logging.scala:logInfo(57)): MapOutputTrackerMasterEndpoint stopped!
2022-04-20 00:28:01,556 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): GlueJobEventsInputHandler Interrupted Last event of a job received. Stop event handling.
2022-04-20 00:28:01,556 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Stopping - analyzer netty server
2022-04-20 00:28:01,561 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Stopped - analyzer netty server
2022-04-20 00:28:01,561 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Processing remaining events in queue before termination
2022-04-20 00:28:01,561 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Closing rules engine
2022-04-20 00:28:01,562 INFO [Thread-6] glue.AnalyzerLogHelper$ (Logging.scala:logInfo(24)): Stopping analyzer log writer
2022-04-20 00:28:01,607 INFO [shutdown-hook-0] memory.MemoryStore (Logging.scala:logInfo(57)): MemoryStore cleared
2022-04-20 00:28:01,607 INFO [shutdown-hook-0] storage.BlockManager (Logging.scala:logInfo(57)): BlockManager stopped
2022-04-20 00:28:01,612 INFO [shutdown-hook-0] storage.BlockManagerMaster (Logging.scala:logInfo(57)): BlockManagerMaster stopped
2022-04-20 00:28:01,616 INFO [dispatcher-event-loop-3] scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint (Logging.scala:logInfo(57)): OutputCommitCoordinator stopped!
2022-04-20 00:28:01,622 INFO [shutdown-hook-0] spark.SparkContext (Logging.scala:logInfo(57)): Successfully stopped SparkContext
2022-04-20 00:28:01,623 INFO [shutdown-hook-0] glue.LogPusher (Logging.scala:logInfo(57)): uploading /tmp/spark-event-logs/ to s3://glue-development-bucket/logs/
2022-04-20 00:28:01,656 INFO [shutdown-hook-0] util.ShutdownHookManager (Logging.scala:logInfo(57)): Shutdown hook called
2022-04-20 00:28:01,657 INFO [shutdown-hook-0] util.ShutdownHookManager (Logging.scala:logInfo(57)): Deleting directory /tmp/spark-4a067bfa-1a27-4b95-b961-1e01f94c5caa/pyspark-c8194dc3-28fb-4e01-a412-cd5f425f032b
2022-04-20 00:28:01,662 INFO [shutdown-hook-0] util.ShutdownHookManager (Logging.scala:logInfo(57)): Deleting directory /tmp/spark-4a067bfa-1a27-4b95-b961-1e01f94c5caa
</body></html>
MoustafaAMahmoud commented 2 years ago

I also tried to add the primaryKey and the preCombineField key, but the output table properties doesn't reflect the default config file. For example, The output table is cow and i expect mor

query_create_tbl = f"""
create table if not exists {db_name}.{table_name} (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
)   using hudi
options (
  preCombineField='ts', 
  primaryKey='id', 
  hoodie.table.name = '{table_name}',
  path = '{s3_output_path}',
  hoodie.datasource.hive_sync.table = '{table_name}'
)
partitioned by (dt, hh);
"""

The output table

CREATE EXTERNAL TABLE `hudi_tbl_partitioned`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `id` bigint, 
  `name` string, 
  `ts` bigint)
PARTITIONED BY ( 
  `dt` string, 
  `hh` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
WITH SERDEPROPERTIES ( 
  'path'='s3://hudi-upgrade/data/hudi_sql_dml_ddl_hive/hudi_tbl_partitioned') 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://hudi-upgrade/data/hudi_sql_dml_ddl_hive/hudi_tbl_partitioned'
TBLPROPERTIES (
  'hoodie.datasource.hive_sync.table'='hudi_tbl_partitioned', 
  'hoodie.table.name'='hudi_tbl_partitioned', 
  'last_commit_time_sync'='20220420003416329', 
  'preCombineField'='ts', 
  'primaryKey'='id', 
  'spark.sql.create.version'='3.1.1-amzn-0', 
  'spark.sql.sources.provider'='hudi', 
  'spark.sql.sources.schema.numPartCols'='2', 
  'spark.sql.sources.schema.numParts'='1', 
  'spark.sql.sources.schema.part.0'='{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"dt\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"hh\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}', 
  'spark.sql.sources.schema.partCol.0'='dt', 
  'spark.sql.sources.schema.partCol.1'='hh', 
  'transient_lastDdlTime'='1650414765', 
  'type'='cow')
MoustafaAMahmoud commented 2 years ago

@zhedoubushishi Is there any configuration missing from my code?

zhedoubushishi commented 2 years ago

@moustafaalaa I cannot reproduce this issue. I just directly export the external config through the shell something like: export HUDI_CONF_DIR=s3://wenningd-xxx/hudi/config and in the following Hudi code I can see it is able to load the config file under that path.

I am not sure what is the best way to pass env config to spark, but one thing you can check is can you print out the value of HUDI_CONF_DIR inside your python code to verify that this env config is passed to Spark successfully.

Also are you using client mode or cluster mode? Currently this external config feature does not support spark on cluster mode.

MoustafaAMahmoud commented 2 years ago

Glue runs in cluster mode. I tried to set the environment variable through spark session or python environment variable, but it doesn't work.

Then I changed the below line in DFSPropertiesConfiguration.java#L60

From

public static final String DEFAULT_CONF_FILE_DIR = "file:/etc/hudi/conf";

to

public static final String DEFAULT_CONF_FILE_DIR = "file:/tmp";

Then add the file path in the jar dependencies to be copied to /tmp folder, and it worked.

s3://glue-development-bucket/scripts/hudi-conf/hudi-defaults.conf

I am not sure if there is a better way. But this is the idea which worked with me.

MoustafaAMahmoud commented 2 years ago

I think it could be better to add this feature as a hudi config for example hoodie.config.path => "s3:path" but the current solution is not generalised.

nsivabalan commented 2 years ago

@umehrot2 @zhedoubushishi : we may need to support this for "cluster" mode as well. as of now, some code change is required which is not easy to maintain. can we come up w/ a plan to fix this for 0.12.

nsivabalan commented 2 years ago

@moustafaalaa : until we have a proper solution, do you think using an hdfs path instead of local would solve the issue? EMR does come w/ HDFS and both driver and executor should be able to access it. Can you give it a try and let us know if using an hdfs path for hudi_conf_dir would help for now.

zhedoubushishi commented 2 years ago

@nsivabalan Yes for the cluster mode, I would send out a PR to fix it. FYI https://github.com/apache/hudi/pull/5987

zhedoubushishi commented 2 years ago

@moustafaalaa With https://github.com/apache/hudi/pull/5987, similar to EMR, you can add hudi-defaults.conf to spark.yarn.dist.files so that Hudi can load this file.

This is a good suggestion: hoodie.config.path => "s3:path". We would have a further discussion on whether/how to support this.

nsivabalan commented 2 years ago

Can we close this one out, or is there any pending work item? I see the linked PR is already landed.

nsivabalan commented 2 years ago

closing it out since the linked PR is landed. thanks!