apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.43k stars 954 forks source link

[Bug] java.lang.OutOfMemoryError #4484

Closed skdfeitian closed 1 week ago

skdfeitian commented 1 week ago

Search before asking

Paimon version

0.8.2

Compute Engine

flink

Minimal reproduce step

A job using a Flink program to write to a Paimon table is configured with 16G for the JobManager and 16G for the TaskManager, but this Flink task frequently encounters an OOM (Out of Memory) issue. I don't understand which operations of ManifestFileMeta consume so much memory. How should I address this? The other configurations are as follows: 'state.checkpoints.num-retained': '9' 'taskmanager.heartbeat.timeout': '6000000' 'taskmanager.heartbeat.interval': '900s' 'heartbeat.timeout': '900000' 'akka.ask.timeout': '900s' 'taskmanager.memory.task.off-heap.size': '10048m' 'taskmanager.memory.managed.fraction': '0.01'

2024-11-08 14:35:33,621 DEBUG org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure causing attempt c537fb3551c262d57a784f628d336298_337adade1e207453ed3502e01d75fd03_0_1 to fail: java.lang.RuntimeException: Exception occurs when preparing snapshot #116274 (path hdfs://tmp/dap_dev.db/ods_context_log/snapshot/snapshot-116274) by user 49bec96c-2860-4135-a606-5f8cf0b7561b with hash 1 and kind APPEND. Clean up. at org.apache.paimon.operation.FileStoreCommitImpl.tryCommitOnce(FileStoreCommitImpl.java:912) at org.apache.paimon.operation.FileStoreCommitImpl.tryCommit(FileStoreCommitImpl.java:681) at org.apache.paimon.operation.FileStoreCommitImpl.commit(FileStoreCommitImpl.java:285) at org.apache.paimon.table.sink.TableCommitImpl.commitMultiple(TableCommitImpl.java:209) at org.apache.paimon.flink.sink.StoreCommitter.commit(StoreCommitter.java:100) at org.apache.paimon.flink.sink.CommitterOperator.commitUpToCheckpoint(CommitterOperator.java:188) at org.apache.paimon.flink.sink.CommitterOperator.notifyCheckpointComplete(CommitterOperator.java:165) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:467) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:400) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1430) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$16(StreamTask.java:1371) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$19(StreamTask.java:1410) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError at org.apache.paimon.manifest.ManifestFileMeta.merge(ManifestFileMeta.java:183) at org.apache.paimon.operation.FileStoreCommitImpl.tryCommitOnce(FileStoreCommitImpl.java:833) ... 26 more Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError at org.apache.paimon.utils.ScanParallelExecutor$1.advanceIfNeeded(ScanParallelExecutor.java:85) at org.apache.paimon.utils.ScanParallelExecutor$1.hasNext(ScanParallelExecutor.java:60) at org.apache.paimon.manifest.ManifestFileMeta.tryFullCompaction(ManifestFileMeta.java:366) at org.apache.paimon.manifest.ManifestFileMeta.merge(ManifestFileMeta.java:161) ... 27 more Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.paimon.utils.ScanParallelExecutor$1.advanceIfNeeded(ScanParallelExecutor.java:83) ... 30 more Caused by: java.lang.OutOfMemoryError at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.paimon.manifest.FileEntry.lambda$readManifestEntries$1(FileEntry.java:164) at org.apache.paimon.utils.ScanParallelExecutor$1.lambda$advanceIfNeeded$0(ScanParallelExecutor.java:81) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.paimon.shade.org.apache.avro.io.BinaryDecoder.readBytes(BinaryDecoder.java:314) at org.apache.paimon.format.avro.FieldReaderFactory$BytesReader.read(FieldReaderFactory.java:201) at org.apache.paimon.format.avro.FieldReaderFactory$RowReader.read(FieldReaderFactory.java:497) at org.apache.paimon.format.avro.FieldReaderFactory$RowReader.read(FieldReaderFactory.java:449) at org.apache.paimon.format.avro.FieldReaderFactory$NullableReader.read(FieldReaderFactory.java:166) at org.apache.paimon.format.avro.FieldReaderFactory$RowReader.read(FieldReaderFactory.java:497) at org.apache.paimon.format.avro.FieldReaderFactory$RowReader.read(FieldReaderFactory.java:449) at org.apache.paimon.format.avro.FieldReaderFactory$NullableReader.read(FieldReaderFactory.java:166) at org.apache.paimon.format.avro.FieldReaderFactory$RowReader.read(FieldReaderFactory.java:497) at org.apache.paimon.format.avro.AvroRowDatumReader.read(AvroRowDatumReader.java:63) at org.apache.paimon.format.avro.AvroRowDatumReader.read(AvroRowDatumReader.java:32) at org.apache.paimon.shade.org.apache.avro.file.DataFileStream.next(DataFileStream.java:263) at org.apache.paimon.format.avro.AvroBulkFormat$AvroBlockIterator.next(AvroBulkFormat.java:149) at org.apache.paimon.format.avro.AvroBulkFormat$AvroBlockIterator.next(AvroBulkFormat.java:127) at org.apache.paimon.utils.IteratorResultIterator.next(IteratorResultIterator.java:53) at org.apache.paimon.reader.RecordReader.forEachRemaining(RecordReader.java:136) at org.apache.paimon.utils.ObjectsFile.readWithIOException(ObjectsFile.java:130) at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:108) at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:82) at org.apache.paimon.manifest.FileEntry.lambda$null$0(FileEntry.java:163) at org.apache.paimon.manifest.FileEntry$$Lambda$1286/700622616.apply(Unknown Source) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747) at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721) at java.util.stream.AbstractTask.compute(AbstractTask.java:316) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

What doesn't meet your expectations?

I believe the configured memory is large enough. How can I optimize or resolve this issue?

Anything else?

No response

Are you willing to submit a PR?

liming30 commented 1 week ago

@skdfeitian hi, paimon uses heap memory for reading and writing by default. I noticed that you configured 'taskmanager.memory.task.off-heap.size': '10048m', which means the heap may be only about 2GB. Maybe you can set taskmanager.memory.task.off-heap.size to 256MB or 512MB. You can refer to https://paimon.apache.org/docs/master/maintenance/write-performance/ for more information.

skdfeitian commented 1 week ago

@liming30 I made some changes based on your advice; it seems to be working. Thank you very much!