apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.47k stars 2.43k forks source link

[SUPPORT] Bulk upsert? #2946

Closed AkshayChan closed 3 years ago

AkshayChan commented 3 years ago

I am trying to insert/update 15GB of data on a 165GB table, however I keep getting the following error

An error occurred while calling o954.pyWriteDynamicFrame.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210512135309

.
.
.
.
.
.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device

Here is my upsert config:

commonConfig = {
            'className' : 'org.apache.hudi',
            'hoodie.datasource.hive_sync.use_jdbc':'false',
            'hoodie.datasource.write.precombine.field': args['precombine_field'],
            'hoodie.datasource.write.recordkey.field': ','.join([col for col in primary_keys]),
            'hoodie.table.name': table,
            'hoodie.consistency.check.enabled': 'true',
            'hoodie.datasource.hive_sync.database': args['database'],
            'hoodie.datasource.write.partitionpath.field': ','.join([col for col in partition_keys]),
            'hoodie.datasource.hive_sync.partition_fields': ','.join([col for col in partition_keys]),
            'hoodie.datasource.hive_sync.table': table,
            'hoodie.datasource.hive_sync.enable': 'true',
            'path': target_path
}
partitionDataConfig = {
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator'
}
 incrementalConfig = {
            'hoodie.upsert.shuffle.parallelism': 15,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
            'hoodie.cleaner.commits.retained': 10000,
            'hoodie.keep.max.commits': 20000,
            'hoodie.keep.min.commits': 10001
}

We are using the AWS Glue Connector for Apache Hudi through the AWS Glue Studio Marketplace

Stacktrace


  File "/tmp/hudi-full-extract-job.py", line 431, in <module>
    glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(changesDf, glueContext, "changesDf"), connection_type = "marketplace.spark", connection_options = combinedConf)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o954.pyWriteDynamicFrame.
: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210512135309
    at org.apache.hudi.client.HoodieWriteClient.upsert(HoodieWriteClient.java:192)
    at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:206)
    at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:151)
    at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:108)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:43)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 77.0 failed 4 times, most recent failure: Lost task 0.3 in stage 77.0 (TID 22943, 172.34.88.19, executor 32): com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:188)
    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
    at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:240)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:186)
    ... 16 more
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:370)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:369)
    at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:312)
    at org.apache.hudi.table.WorkloadProfile.buildProfile(WorkloadProfile.java:67)
    at org.apache.hudi.table.WorkloadProfile.<init>(WorkloadProfile.java:59)
    at org.apache.hudi.client.HoodieWriteClient.upsertRecordsInternal(HoodieWriteClient.java:462)
    at org.apache.hudi.client.HoodieWriteClient.upsert(HoodieWriteClient.java:187)
    ... 36 more
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:188)
    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
    at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:241)
    at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: No space left on device
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:326)
    at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
    at net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:240)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:186)
    ... 16 more```
n3nash commented 3 years ago

@AkshayChan From the message it seems to be pretty clear that some of the nodes are running out of disk space

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 77.0 failed 4 times, most recent failure: Lost task 0.3 in stage 77.0 (TID 22943, 172.34.88.19, executor 32): com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device

I would recommend checking the EMR instances you are provisioning and logging into the boxes when the job is running to see when it's running out of space. To give you an idea of how this can happen, whenever Hudi performs an upsert, it will shuffle some data around. Spark shuffle has 2 phases : map and reduce. The map phase spills data to the local disk and uses the KryoSerializer to do so. That's is where you are running into this exception.

Not much I can do here. Let me know if you need anything.

n3nash commented 3 years ago

@AkshayChan Haven't heard from you in a while, I'm closing this ticket since the resolution is outside of Hudi. Please re-open if you need more help.