Open dataproblems opened 1 month ago
After reading this issue, I tried adding the following configuration to my hudi options:
"hoodie.write.markers.type" -> "DIRECT",
"hoodie.embed.timeline.server" -> "false"
But that failed as well, with this exception in the executors:
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_422]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_422]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_422]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_422]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_422]
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
24/10/16 22:18:19 ERROR TransportResponseHandler: Still have 8 requests outstanding when connection from ip-10-0-175-8.ec2.internal/10.0.175.8:39085 is closed
24/10/16 22:18:19 WARN BlockManager: Putting block taskresult_3875 failed due to exception org.apache.spark.SparkException: Exception thrown in awaitResult: .
24/10/16 22:18:19 WARN BlockManager: Putting block taskresult_3876 failed due to exception org.apache.spark.SparkException: Exception thrown in awaitResult: .
24/10/16 22:18:19 ERROR Utils: Aborting task
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:104) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:449) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_422]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_422]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_422]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_422]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_422]
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
... 1 more
@dataproblems Looks like the issue is with timeline server. Can you disable timeline server based markers -
Set hoodie.write.markers.type to DIRECT and try once.
If this also doesn't work then you can try disabling the timeline server itself. hoodie.embed.timeline.server to false
Can you please give more information about your environment. Just wanted to understand the issue with timeline server.
@ad1happy2go - Did you mean hoodie.write.markers.type
to DIRECT
? I see only two values for the hoodie.write.markers.type
which is TIMELINE_SERVER_BASED
and DIRECT
. I also tried setting hoodie.embed.timeline.server
to false
. See this comment.
I'm running this on an EMR release 6.11.0. What other information ( other than the spark version / spark submit command ) would you need?
@dataproblems Thanks and sorry for overlooking that comment. Error in this comment https://github.com/apache/hudi/issues/12116#issuecomment-2418081362 is very generic. Do you see any other exception? Can you check the node health , are executor node going down due to OOM?
@dataproblems Can you share the entire driver logs what you ran without timeline server markers. By the exception its not clear which part of the code is getting broken.
@ad1happy2go - I see a couple of entries similar to:
ava.io.EOFException: Unexpected EOF while trying to read response from server
at org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:538) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
at org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1137) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
24/10/17 16:24:27 WARN DataStreamer: Error Recovery for BP-2036047854-10.0.175.8-1729115502972:blk_1073745219_6368 in pipeline [DatanodeInfoWithStorage[10.0.165.76:9866,DS-5bdd769f-3dcf-4c53-8660-6a8bac6b1d0b,DISK], DatanodeInfoWithStorage[10.0.173.83:9866,DS-f05105bd-790b-4c8c-8767-2b059159b484,DISK], DatanodeInfoWithStorage[10.0.161.195:9866,DS-f4f690e7-6369-42b8-9c84-566d7dbd29f3,DISK]]: datanode 0(DatanodeInfoWithStorage[10.0.165.76:9866,DS-5bdd769f-3dcf-4c53-8660-6a8bac6b1d0b,DISK]) is bad.
in the driver logs. Does that give you an idea? ( I'm not sure if I can share the entire log file. )
I also see:
org.apache.hudi.exception.HoodieIOException: Failed to create file s3://redacted_table_path/redacted_parition/.hoodie_partition_metadata
at org.apache.hudi.storage.HoodieStorage.createImmutableFileInPath(HoodieStorage.java:326) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.common.model.HoodiePartitionMetadata.lambda$trySave$19fcee3a$1(HoodiePartitionMetadata.java:115) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:85) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:113) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.common.model.HoodiePartitionMetadata.trySave(HoodiePartitionMetadata.java:121) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.io.storage.row.HoodieRowCreateHandle.<init>(HoodieRowCreateHandle.java:142) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.createHandle(BulkInsertDataInternalWriterHelper.java:217) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.getRowCreateHandle(BulkInsertDataInternalWriterHelper.java:203) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.write(BulkInsertDataInternalWriterHelper.java:125) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:62) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:38) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://redacted_table_path/redacted_partition/.hoodie_partition_metadata
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:342) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1202) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1182) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1071) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:207) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
at org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.lambda$create$2(HoodieWrapperFileSystem.java:242) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:118) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.create(HoodieWrapperFileSystem.java:241) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.storage.hadoop.HoodieHadoopStorage.create(HoodieHadoopStorage.java:128) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
at org.apache.hudi.storage.HoodieStorage.createImmutableFileInPath(HoodieStorage.java:321) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
... 22 more
this exception in one of the executor logs that I spot checked.
Hi @dataproblems,
Additionally, you mentioned that an OOM error is occurring. After setting the hoodie.write.markers.type to DIRECT
, are you still experiencing the OOM issue?
Hi @rangareddy - I still see the exit code 137 in the driver log, I only checked a few of the executor logs and pasted the exception in my previous comment. This is a single writer creating the base table.
I also tried creating the base table with less than 1% of my data ( something around 100 GB ) and the job just gets stuck here ( see attached screenshot ). I can see that the data files are there in S3 but the commit file isn't there yet. I'm also not sure what hoodie is doing in the stage. ss.pdf
@dataproblems I noticed that only 400 tasks are getting created. This may be the main problem as tasks are taking more than 18 mins already. Can you find out the reason for this. Can you find out why its only created 400 tasks?
Whats the nature of your input data? somehow looks like your input dataframe only creating 400 partition. Can you try repartition before saving to see if it works
@ad1happy2go - That's likely because I'm setting the bulk insert shuffle parallelism to 400, I've tried with other values but all result in similar outcomes. Can you elaborate on the nature of the data question? Here's another screenshot where the partition / parallelism could was higher.
@dataproblems Can you remove this config and try, After Hudi 0.14.x we auto calculate the number of parallelism required.
On above screenshot can you expand Failure reason and send stacktrace?
@ad1happy2go - I've removed that config for the second screenshot. I couldn't find the executor with the exact error message as shown in the screen shot but here's something that I do see:
4/10/21 04:15:25 INFO ShuffleBlockFetcherIterator: Started 40 remote fetches in 1160 ms
24/10/21 04:15:25 INFO ShuffleBlockFetcherIterator: Started 42 remote fetches in 1160 ms
24/10/21 04:15:25 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException: null
at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_doSort_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hasNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborting commit for partition 4117 (task 20594, attempt 0, stage 23.0)
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborted commit for partition 4117 (task 20594, attempt 0, stage 23.0)
24/10/21 04:15:25 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException: null
at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_doSort_0$(Unknown Source) ~[?:?]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hasNext(Unknown Source) ~[?:?]
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborting commit for partition 4116 (task 20593, attempt 0, stage 23.0)
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborted commit for partition 4116 (task 20593, attempt 0, stage 23.0)
24/10/21 04:15:25 INFO Executor: Executor killed task 4117.0 in stage 23.0 (TID 20594), reason: Stage cancelled
24/10/21 04:15:25 INFO Executor: Executor killed task 4116.0 in stage 23.0 (TID 20593), reason: Stage cancelled
24/10/21 04:15:26 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
Also for some reason the driver claims that the application was succeeded but the Spark UI as well as the data sink in s3 show that the data was never written completely:
24/10/21 04:37:15 INFO ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 1190.
24/10/21 04:37:17 INFO ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. ip-10-0-164-85.ec2.internal:46343
24/10/21 04:37:17 INFO ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. ip-10-0-164-85.ec2.internal:46343
24/10/21 04:37:17 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
24/10/21 04:37:17 INFO ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
24/10/21 04:37:17 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
24/10/21 04:37:18 INFO ApplicationMaster: Deleting staging directory hdfs://ip-10-0-164-85.ec2.internal:8020/user/hadoop/.sparkStaging/application_1729482915237_0001
24/10/21 04:37:18 INFO ShutdownHookManager: Shutdown hook called
The log entry driver terminated or disconnected
is a little weird too.
@dataproblems This doesn't give much of insights. Possible to attach complete driver and one executor log?
@ad1happy2go - Sure. Here you go. You will see the stack trace in the driver.log file. driver.log executor.log
@dataproblems Any reason why we are using such a high --conf spark.executor.heartbeatInterval=900s. It should be much lesser than spark.network.timeout. Can you try leaving these as defaults one. I see lot of issues with spark configs.
Shouldn't be using --conf spark.driver.maxResultSize=0 also, as then driver result collection will not have any limit. You may increase upto 4 gb if required but keep a check.
Also did you tried turning on the timeline server i.e. update only this "hoodie.embed.timeline.server" to "true" as this is used to build file system image. Keep markers as direct only.
@ad1happy2go - We were getting heartbeat timeout exceptions which resulted in the increased value for those configurations. Same with the driver maxResultSize, I got an exception about the driver result being larger than the maxResultSize of 5g that I was using previously - as such, I removed the limit to mitigate that error.
When I tried using DIRECT markers but set hoodie.embed.timeline.server
to true. I still got a similar error ( see screenshot below ).
@dataproblems There is something wrong in your setup, if such a large data size is getting collected to driver. Is it possible to share the hudi timeline? whats the size of the commit files?
@ad1happy2go - Given that this is creating the table, there is only a single commit requested. Both the commit.requested and commit.inflight objects are 0 B in size. Since we never get to the .commit file as the job fails before writing all of the data.
On a separate note - when I do disable POPULATE_META_FIELDS
( and am using 0.15.0 ) the commit file for a small 100 GB sample of my complete dataset is around 40 MB. Do you think the process that's creating this commit file for my complete dataset is resulting in a failure? My complete dataset is several TB in size, as such the size of the .commit
file will increase and maybe that's causing the issues that I'm facing?
The spark job is merely reading from S3 and writing the data back in hudi format on our end, there are no operations we perform which would result in the dataset being collected on the driver, so I would defer to you on that front - usually it's in the mapToPair operation in HoodieJavaRdd file or in the save operation as seen in the previous screenshots.
@dataproblems There is some problem here. For a 100 GB data you should not have 40 MB commit file. One reason that could be the possible is nature of your partition column. How many partitions you have, i mean how many unique values are there in your dataset for your partition column
Another reason can be you have lot of small files in the input. if thats the case, Can you try repartition the dataframe before saving it?
@ad1happy2go, I have about 6 partitions for the sample dataset that I'm using.
Partition | Number of unique values |
---|---|
One | 12959311 |
Two | 629845160 |
Three | 458227144 |
Four | 1107519580 |
Five | 472111 |
Six | 19391133 |
Let me update you on how the repartition exercise goes and see if it results in a smaller size for the .commit
file.
=> Update, for my sample data I tried an extreme case of repartition(..)
but using numPartitions as the total number of unique partition cols and I was able to see that the commit file reduced from ~40 MB to ~50KB. Next I'm trying to enable the POPULATE_META_FIELDS
flag along with the repartition to see if I can create the index with the sample data.
POPULATE_META_FIELDS
set to true.=> Update: I tried using .repartition with POPULATE_META_FIELDS
set to true but haven't had luck with the completion of the job. I see that the data has been written to s3 but no .commit
file.
Our main problem is that we're not able to use POPULATE_META_FIELDS
to true
and create the index. The job fails after writing partial data to s3 due to executor heartbeats / OOM issues. Do you think GC could be a culprit there?
We also decided to give random data a try with hudi to see if there's a problem that we are running into due to the nature of our data or something else. Here's the script we used to generate the data and generate the hudi table:
Here's the code:
import java.util.UUID
import scala.util.Random
case class RandomData(id: Long, uuid: String, ts: Long = 28800000L, partition: String)
val partitions = List("One", "Two", "Three", "Four")
val randomData = spark.range(1, 10 * 10000000L).map(f => RandomData(id = f, uuid = UUID.randomUUID.toString, partition = Random.shuffle(partitions).head))
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import spark.implicits._
import org.apache.hudi.keygen.SimpleKeyGenerator
val inserts = spark.read.parquet("$somePath")
val tableName = "random_table"
val basePath = "$someHudiPath"
val bulkWriteOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
"hoodie.parquet.small.file.limit" -> "1073741824",
HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> BulkInsertSortMode.GLOBAL_SORT.name(),
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.metadata.enable" -> "true",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.clustering.inline" -> "true",
"hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
"hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
"hoodie.datasource.write.partitionpath.field" -> "partition",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.table.name" -> tableName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
"hoodie.write.markers.type" -> "DIRECT",
"hoodie.embed.timeline.server" -> "false"
)
inserts.repartition(100).write.format("hudi").
options(bulkWriteOptions).
mode(Overwrite).
save(basePath)
Data Size | Commit Requested | Commit | Total Time |
---|---|---|---|
47 MB | 00:03 | 00:40 | 37 seconds |
400 MB | 00:24 | 02:13 | 1 minute 49 seconds |
800 MB | 00:08 | 03:02 | 2 minutes 54 seconds |
1.9 GB | 00:38 | 07:03 | 6 minutes 25 seconds |
3.8 GB | 00:28 | 16:55 | 16 minutes 27 seconds |
37 GB | Didn't create the table ( still in running / stuck state after 88 minutes) |
From my observations, the table creation time increases somewhat linearly at the start but then with 1.9 GB -> 3.8 GB it's non linear. Also, we were not able to get the table created for the dataset of 37 GB in size. Can you see if you can reproduce it on your end? It would be useful to learn about what parameter and configuration worked for you. I tried the random data example with and without the repartition - both times, I saw that partial data was written to S3 and then the job would go into the long pause / running state. See attached screenshot
@ad1happy2go - did you get a chance to take a look at the results / data I posted last week?
@dataproblems For the Exepriment 2, you can try increasing executor memory overhead. You cam also check the GC time under stages if that is a problem. I see you are already tuning your GC mentioned on this doc - https://hudi.apache.org/docs/tuning-guide/
For the Experiment 3 - i can clearly see the problem is there with parallelim. Its just creating 100 tasks and they are running from 1.6h. Can you try to increase the parallelism in this case. To do this you have to increase the repartition factor along with the dataset which is 100 in your case.
@ad1happy2go -
Based on your suggestion, I updated the numPartitions
from 100
to 1000
which should be sufficient for a dataset of size 37 GB. However, I see that the save at DatasetBulkInsertCommitActionExecutor.java:81
stage failed with the following exception:
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
Container id: container_1730319579539_0001_01_000355
Exit code: 56
[2024-10-30 20:51:25.811]Container exited with a non-zero exit code 56. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
uledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:314) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
... 13 more
24/10/30 20:51:15 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1116) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2122) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_422]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_422]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_422]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:314) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
... 13 more
24/10/30 20:51:15 ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times
see attached screenshot:
Followed up with @ad1happy2go in Hudi Office hours and got more things to try:
None
with timeline server enabled:For this follow up action item, I noticed that the data was written to s3 but the job got stuck. I used the 37 GB dataset generated with the random data generation script I posted earlier.
None
and increase the hoodie.metadata.record.index.min.filegroup.count
from 10 (default) to 10000For this experiment, I used the following config:
val bulkWriteOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
"hoodie.parquet.small.file.limit" -> "1073741824",
HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> BulkInsertSortMode.NONE.name(),
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.metadata.enable" -> "true",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.clustering.inline" -> "true",
"hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
"hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
"hoodie.datasource.write.partitionpath.field" -> "partition",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.table.name" -> tableName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
"hoodie.write.markers.type" -> "DIRECT",
"hoodie.embed.timeline.server" -> "true",
"hoodie.metadata.record.index.min.filegroup.count" -> "10000",
"hoodie.metadata.record.index.max.filegroup.count" -> "100000"
)
I also tried with "hoodie.metadata.record.index.min.filegroup.count" -> "1000"
and got the same outcome.
Here are the screenshots from the Spark UI.
Here are the random_exp_executor_stderr.log and random_exp_executor_stdout.log from one of the executors with high GC time.
I see the executor heart beat timeouts here as well. Do you have any ideas as to why we might be running into this?
INSERT
mode with 37 GB of dataFor this approach, I used the same input dataset and the following config:
val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
"hoodie.parquet.small.file.limit" -> "1073741824",
HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.metadata.enable" -> "true",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.datasource.write.partitionpath.field" -> "partition",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.table.name" -> tableName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
"hoodie.write.markers.type" -> "DIRECT",
"hoodie.embed.timeline.server" -> "true",
"hoodie.metadata.record.index.min.filegroup.count" -> "1000",
)
The job was successful and I was able to generate the hudi dataset with the record level index.
@dataproblems Good to know atleast you are unblocked. You should anyway use upsert operation going forward with RLI. "insert" and "upsert" operations also do automatic file sizing so you may not need clustering also.
Although., bulk_insert with None sort mode should be faster than "insert" and ideally need less resources than "insert". So its strage that on same resources, bulk_insert failed but insert succeded. Can you try to increase hoodie.bulkinsert.shuffle.parallelism to 10000 and see if its getting successful.
@ad1happy2go - unfortunately, even after increasing the hoodie.bulkinsert.shuffle.parallelism
, I ran into the same issue. As far as the insert mode goes, I tied with a dataset of 2 TB and increased the min file count for Record level index and was able to create the table - trying with my other dataset now, which is 20x the size. If I can generate the tables for both the datasets, that would be great.
@dataproblems Thanks for the update. Let us know how insert works with 20x size.
@ad1happy2go - I increased the min file count to 10000
for the 37 TB dataset that I have, however, on two trials to create the table using insert mode, I see the job failing during the Committing Stats: collect at SparkRDDWriteClient.java:103
stage - I can see that the data is in S3 but I think it's failing during the creation of the commit file + index files. Any recommendations on this?
Another thing I noticed was that the file sizing isn't quite optimal, I have one partition that is larger than the rest due to the inherent nature of the data and hudi creates files of 160 MB for a partition that has potentially multiple TBs of data.
@dataproblems Can you please share the spark UI screenshots?
On this
Another thing I noticed was that the file sizing isn't quite optimal, I have one partition that is larger than the rest due to the inherent nature of the data and hudi creates files of 160 MB for a partition that has potentially multiple TBs of data.
Why do you think 160MB files are not optimal. Also can you check the size of your timeline files under .hoodie.
@ad1happy2go The similar issue happened to me as well in _bulkinsert mode where when i tried to insert a 34GB json input with 100 partitions to hudi table(approx 5.5GB parquet) got an java.lang.OutOfMemoryError: Java heap space
I am using hoodie.bulkinsert.shuffle.parallelism = '10',hoodie.write.markers.type = 'direct', hoodie.embed.timeline.server = 'false'.
As @dataproblems stated when I started with less memory to job I got Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1080.2 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
so that's the reason even I too have increased the spark.driver.maxResultSize property looks like the whole data is somehow transferred to driver memory.
But as u pointed the no of tasks are less to me as well due to 100 source partitions read, let me also once try with repartition of the dataset by 200 & increase the hoodie.bukinsert.shuffle.parallelism to 500.
Any further suggestions /optimizations I shall do as in SPARKSQL Hudi table creation, bulk_insert is the only mode available by default i know i can switch to DF API'S but using SQL can we switch to normal insert?
Describe the problem you faced
I am unable to create a hudi table using the data that I have with POPULATE_META_FIELDS being enabled. I can create the table with POPULATE_META_FIELDS set to false.
To Reproduce
Steps to reproduce the behavior:
There are a total of 68 billion unique record keys and my total dataset is around 5TB.
Expected behavior
I should be able to create the table without any exceptions
Environment Description
Hudi version : 0.15.0, 1.0.0-beta1, 1.0.0-beta2
Spark version : 3.3, 3.4
Hive version :
Hadoop version :
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : no
Additional context
Spark submit command:
Hudi Bulk Insert Options:
I've tried GLOBAL, PARTITION_SORT, and NONE => all result in the same error.
Stacktrace
This is the piece / stage that fails:
The executors have these errors:
I see exit code 137 in the driver logs, OOM: Java Heap Space in the stdout logs.