Closed TranHuyTiep closed 1 year ago
can you provide a simple reproduce step, like code
or sql
can you provide a simple reproduce step, like
code
orsql
Here is my code
`# -- coding: utf-8 -- from pyspark.conf import SparkConf from pyspark.sql import SparkSession
if name == 'main': print("_____Run__")
spark_conf = SparkConf()
spark_conf.set("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0,org.apache.spark:spark-avro_2.12:3.3.1")
spark_conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark_conf.set("spark.sql.hive.convertMetastoreParquet", "false")
spark_conf.set("spark.rdd.compress", "true")
sparkSession = (SparkSession
.builder
.appName('read_hudi')
.config(conf=spark_conf)
.getOrCreate())
file_path = "hdfs://hdfs_host:9000/data/dwd/trans_event"
# READ HUDI
df_load = sparkSession.read.format("org.apache.hudi").load(
file_path, inferSchema=True,
header=True
)
# READ
df_load.createOrReplaceTempView("trans_event")
query_e34_trans_raw = """
SELECT transaction_data.profile_id FROM trans_event limit 10
"""
df_load_profile_trans = sparkSession.sql(sqlQuery=query_e34_trans_raw)
df_load_profile_trans.printSchema()
df_load_profile_trans.show(10)
sparkSession.stop()
print("_____________End______________")
`
is it can work in your local ? not k8s ?
is it can work in your local ? not k8s ? yes, it can work in local I set up spark_conf.setMaster("local[*]") can work in k8s but not create executor and run in one driver
have you point some config in your submit command, something like:
pyspark \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
and look like the env is different in diff submit mode. I think you can confirm the env, for example, if you in cluster mode and use yarn archive, make sure hudi jar in archive and so on.
@TranHuyTiep Were you able to resolve this issue or still facing the same?
cc @harsh1231
@TranHuyTiep I have the same environment on k8s. how can I connect with you?
@TranHuyTiep Were you able to resolve this issue or still facing the same? I solved the above problem by build new image and copy all packages in .ivy2/jars/* to /opt/spark/jars/
Thanks @TranHuyTiep. Closing the issue as you are able to fix. Please reopen if you see issue again.
I solved the above problem by build new image and copy all packages in .ivy2/jars/* to /opt/spark/jars/
same here on kubernetes. Sounds like k8s does not works well with spark.jars.packages
and hudi
Describe the problem you faced
Environment Description
Additional context
file yaml apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: Demo namespace: default spec: type: Python pythonVersion: "3" mode: cluster image: apache/spark-py:v3.3.1 imagePullPolicy: Always mainApplicationFile: local:///opt/spark/work-dir/demo.py sparkVersion: "3.3.1" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 coreLimit: "1200m" memory: "1024m" labels: version: 3.3.1 serviceAccount: spark envFrom:
throw an error
INFO DAGScheduler: Job 0 failed: collect at HoodieSparkEngineContext.java:137, took 0.609365 s Traceback (most recent call last): File "/opt/spark/work-dir/demo.py", line 73, in <module> df_load.show(10) File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 606, in show File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o70.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.244.0.45 executor 2): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD