apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT]: When trying to UPSERT, Getting issues like : An error occurred while calling o168.save. org/apache/spark/sql/avro/SchemaConverters$ AND An error occurred while calling o168.save. Failed to upsert for commit time 20230410133751 #8428

Open Madan16 opened 1 year ago

Madan16 commented 1 year ago

Tips before filing an issue

Describe the problem you faced: We are trying to upsert in non-partioned table. so far code was working fine(ran for almost 2 months once in a day) for every upsert but all of a sudden it started failing with below reasons: 1) An error occurred while calling o167.save. org/apache/spark/sql/avro/SchemaConverters$. 2) An error occurred while calling o168.save. Failed to upsert for commit time 20230410133751.

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Using below code to perform the upsert: print('Writing to unpartitioned Hudi table.') combinedConf = {commonConfig, unpartitionDataConfig, incrementalConfig} outputDf.write.format('org.apache.hudi').options(combinedConf).mode('Append').save(targetPath)

  2. configuration details: commonConfig = {'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'ingest_dt', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName, 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': dbName, 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.enable': 'true'}

        unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
    
        incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 20, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
  3. targetPath: s3 bucket

Expected behavior

upsert should have happened as it was running fine untill above error started showing up.

Environment Description

Additional context

Add any other context about the problem here. column and data type of source:

|-- pk_ABC_ky: string |-- A: int |-- B: long |-- C: date |-- D: date |-- E: string |-- op: string |-- source_name: string |-- source_schema: string |-- source_table: string |-- ingest_dt: string

column and data type of target: pk_ABC_ky:string A:int B:bigint C:date D:date E:string op:varchar(1) source_name: varchar(24) source_schema:varchar(24) source_table:varchar(13) ingest_dt:string

Stacktrace

23/04/11 13:39:26 ERROR GlueExceptionAnalysisListener: [Glue Exception Analysis] {
    "Event": "GlueETLJobExceptionEvent",
    "Timestamp": 1681220366473,
    "Failure Reason": "Traceback (most recent call last):\n  File \"/tmp/TEST_QA_Hudi.py\", line 216, in <module>\n    outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py\", line 1109, in save\n    self._jwrite.save(path)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 111, in deco\n    return f(*a, **kw)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 328, in get_return_value\n    format(target_id, \".\", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o168.save.\n: java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/SchemaConverters$\n\tat org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:63)\n\tat org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:216)\n\tat org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)\n\tat org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.SchemaConverters$\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:387)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 41 more\n",
    "Stack Trace": [
        {
            "Declaring Class": "get_return_value",
            "Method Name": "format(target_id, \".\", name), value)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
            "Line Number": 328
        },
        {
            "Declaring Class": "deco",
            "Method Name": "return f(*a, **kw)",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
            "Line Number": 111
        },
        {
            "Declaring Class": "__call__",
            "Method Name": "answer, self.gateway_client, self.target_id, self.name)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
            "Line Number": 1305
        },
        {
            "Declaring Class": "save",
            "Method Name": "self._jwrite.save(path)",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
            "Line Number": 1109
        },
        {
            "Declaring Class": "<module>",
            "Method Name": "outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)",
            "File Name": "/tmp/TEST_QA_Hudi.py",
            "Line Number": 216
        }
    ],
    "Last Executed Line number": 216,
    "script": "TEST_QA_Hudi.py"
}

**stack trace for this error:An error occurred while calling o168.save. Failed to upsert for commit time 20230410133751.** 
23/04/10 13:37:56 ERROR GlueExceptionAnalysisListener: [Glue Exception Analysis] {
    "Event": "GlueExceptionAnalysisStageFailed",
    "Timestamp": 1681133876157,
    "Failure Reason": "Job aborted due to stage failure: Task 31 in stage 18.0 failed 4 times, most recent failure: Lost task 31.3 in stage 18.0 (TID 352) (172.34.102.9 executor 7): java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/IncompatibleSchemaException",
    "Stack Trace": [
        {
            "Declaring Class": "org.apache.hudi.HoodieSparkUtils$",
            "Method Name": "$anonfun$createRddInternal$2",
            "File Name": "HoodieSparkUtils.scala",
            "Line Number": 137
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "$anonfun$mapPartitions$2",
            "File Name": "RDD.scala",
            "Line Number": 863
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "$anonfun$mapPartitions$2$adapted",
            "File Name": "RDD.scala",
            "Line Number": 863
        },
        {
            "Declaring Class": "org.apache.spark.rdd.MapPartitionsRDD",
            "Method Name": "compute",
            "File Name": "MapPartitionsRDD.scala",
            "Line Number": 52
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "computeOrReadCheckpoint",
            "File Name": "RDD.scala",
            "Line Number": 373
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "iterator",
            "File Name": "RDD.scala",
            "Line Number": 337
        },
        {
            "Declaring Class": "org.apache.spark.rdd.MapPartitionsRDD",
            "Method Name": "compute",
            "File Name": "MapPartitionsRDD.scala",
            "Line Number": 52
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "computeOrReadCheckpoint",
            "File Name": "RDD.scala",
            "Line Number": 373
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "iterator",
            "File Name": "RDD.scala",
            "Line Number": 337
        },
        {
            "Declaring Class": "org.apache.spark.rdd.MapPartitionsRDD",
            "Method Name": "compute",
            "File Name": "MapPartitionsRDD.scala",
            "Line Number": 52
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "computeOrReadCheckpoint",
            "File Name": "RDD.scala",
            "Line Number": 373
        },
        {
            "Declaring Class": "org.apache.spark.rdd.RDD",
            "Method Name": "iterator",
            "File Name": "RDD.scala",
            "Line Number": 337
        },
        {
            "Declaring Class": "org.apache.spark.shuffle.ShuffleWriteProcessor",
            "Method Name": "write",
            "File Name": "ShuffleWriteProcessor.scala",
            "Line Number": 59
        },
        {
            "Declaring Class": "org.apache.spark.scheduler.ShuffleMapTask",
            "Method Name": "runTask",
            "File Name": "ShuffleMapTask.scala",
            "Line Number": 99
        },
        {
            "Declaring Class": "org.apache.spark.scheduler.ShuffleMapTask",
            "Method Name": "runTask",
            "File Name": "ShuffleMapTask.scala",
            "Line Number": 52
        },
        {
            "Declaring Class": "org.apache.spark.scheduler.Task",
            "Method Name": "run",
            "File Name": "Task.scala",
            "Line Number": 131
        },
        {
            "Declaring Class": "org.apache.spark.executor.Executor$TaskRunner",
            "Method Name": "$anonfun$run$3",
            "File Name": "Executor.scala",
            "Line Number": 497
        },
        {
            "Declaring Class": "org.apache.spark.util.Utils$",
            "Method Name": "tryWithSafeFinally",
            "File Name": "Utils.scala",
            "Line Number": 1439
        },
        {
            "Declaring Class": "org.apache.spark.executor.Executor$TaskRunner",
            "Method Name": "run",
            "File Name": "Executor.scala",
            "Line Number": 500
        },
        {
            "Declaring Class": "java.util.concurrent.ThreadPoolExecutor",
            "Method Name": "runWorker",
            "File Name": "ThreadPoolExecutor.java",
            "Line Number": 1149
        },
        {
            "Declaring Class": "java.util.concurrent.ThreadPoolExecutor$Worker",
            "Method Name": "run",
            "File Name": "ThreadPoolExecutor.java",
            "Line Number": 624
        },
        {
            "Declaring Class": "java.lang.Thread",
            "Method Name": "run",
            "File Name": "Thread.java",
            "Line Number": 750
        },
        {
            "Declaring Class": " java.lang.ClassNotFoundException: org.apache.spark.sql.avro.IncompatibleSchemaException",
            "Method Name": "CausedBy",
            "File Name": "CausedBy",
            "Line Number": -1
        },
        {
            "Declaring Class": "java.net.URLClassLoader",
            "Method Name": "findClass",
            "File Name": "URLClassLoader.java",
            "Line Number": 387
        },
        {
            "Declaring Class": "java.lang.ClassLoader",
            "Method Name": "loadClass",
            "File Name": "ClassLoader.java",
            "Line Number": 418
        },
        {
            "Declaring Class": "sun.misc.Launcher$AppClassLoader",
            "Method Name": "loadClass",
            "File Name": "Launcher.java",
            "Line Number": 352
        },
        {
            "Declaring Class": "java.lang.ClassLoader",
            "Method Name": "loadClass",
            "File Name": "ClassLoader.java",
            "Line Number": 351
        }
    ],
    "Stage ID": 18,
    "Stage Attempt ID": 0,
    "Number of Tasks": 40
}

AND

23/04/10 13:37:56 ERROR GlueExceptionAnalysisListener: [Glue Exception Analysis] {
    "Event": "GlueETLJobExceptionEvent",
    "Timestamp": 1681133876669,
    "Failure Reason": "Traceback (most recent call last):\n  File \"/tmp/TEST_QA_Hudi.py\", line 216, in <module>\n    outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py\", line 1109, in save\n    self._jwrite.save(path)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 111, in deco\n    return f(*a, **kw)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 328, in get_return_value\n    format(target_id, \".\", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o168.save.\n: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20230410133751\n\tat org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)\n\tat org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46)\n\tat org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:98)\n\tat org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:88)\n\tat org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:157)\n\tat org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)\n\tat org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:265)\n\tat org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)\n\tat org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 18.0 failed 4 times, most recent failure: Lost task 31.3 in stage 18.0 (TID 352) (172.34.102.9 executor 7): java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/IncompatibleSchemaException\n\tat org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$2(HoodieSparkUtils.scala:137)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.IncompatibleSchemaException\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:387)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 22 more\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)\n\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)\n\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)\n\tat org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:414)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:1029)\n\tat org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:366)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:414)\n\tat org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:366)\n\tat org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)\n\tat org.apache.hudi.index.bloom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:114)\n\tat org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84)\n\tat org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60)\n\tat org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69)\n\tat org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51)\n\t... 45 more\nCaused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/IncompatibleSchemaException\n\tat org.apache.hudi.HoodieSparkUtils$.$anonfun$createRddInternal$2(HoodieSparkUtils.scala:137)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n\tat org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:131)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t... 1 more\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.IncompatibleSchemaException\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:387)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\n\tat sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\n\t... 22 more\n",
    "Stack Trace": [
        {
            "Declaring Class": "get_return_value",
            "Method Name": "format(target_id, \".\", name), value)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
            "Line Number": 328
        },
        {
            "Declaring Class": "deco",
            "Method Name": "return f(*a, **kw)",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
            "Line Number": 111
        },
        {
            "Declaring Class": "__call__",
            "Method Name": "answer, self.gateway_client, self.target_id, self.name)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
            "Line Number": 1305
        },
        {
            "Declaring Class": "save",
            "Method Name": "self._jwrite.save(path)",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
            "Line Number": 1109
        },
        {
            "Declaring Class": "<module>",
            "Method Name": "outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)",
            "File Name": "/tmp/TEST_QA_Hudi.py",
            "Line Number": 216
        }
    ],
    "Last Executed Line number": 216,
    "script": "TEST_QA_Hudi.py"
}
ad1happy2go commented 1 year ago

@Madan16 Looks like the avro library mismatch issue, as you are saying this is running fine for 2 months do you know if any aws lib or any other updated recently.

Madan16 commented 1 year ago

@Madan16 Looks like the avro library mismatch issue, as you are saying this is running fine for 2 months do you know if any aws lib or any other updated recently.

@ad1happy2go : Sorry but I could not understand your question. Can you please be more specific so that I can provide more details. Note***: I am using this (pyspark) code in AWS glue

ad1happy2go commented 1 year ago

@Madan16 I wanted to ask were you using AWS Glue version : Glue 3.0 only from start. (When the job is successful)

My guess is somehow the version mismatch might be happening which is resulting in ClassNot Found for SchemaConverters which is not present in older avro versions.

Madan16 commented 1 year ago

@Madan16 I wanted to ask were you using AWS Glue version : Glue 3.0 only from start. (When the job is successful)

My guess is somehow the version mismatch might be happening which is resulting in ClassNot Found for SchemaConverters which is not present in older avro versions.

@ad1happy2go : yeah Glue 3.0 version since beginning

ad1happy2go commented 1 year ago

@Madan16 Were you able to resolve it. Can you try to use by our bundle jar once instead on using --datalake-format.

Madan16 commented 1 year ago

Hello all and @ad1happy2go : Strange thing is happening...

  1. When I tried to run using Glue 4.0 version, it ran fine without any issue.
  2. Same code is running fine into different AWS account with Glue 2.0 , that supports spark 2.4, Scala 2 and Python 3. Apache Hudi Connector | MARKETPLACE
  3. Only problem happening with AWS Glue version : Glue 3.0.

This is really strange. why this code is not running fine with Glue 3.0 as it was running fine till issue came.

ad1happy2go commented 1 year ago

@Madan16 This is strange as the code just stopped working with Glue 3.0. Should be something related to AWS specifically. I am not able to reproduce this issue either. Can we use Glue 4.0 for your use case for now. If you still need to use Glue 3.0 I suggest you to use the compiled version of hudi jar instead of relying on AWS provided data lake-formats.

Madan16 commented 1 year ago

@ad1happy2go I used glue 4.0 and it ran fine, as i said in my previous comments.