apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.27k stars 911 forks source link

[Bug] Performance issues with appendOnly table with a large number of partitions #913

Closed Cqz666 closed 1 year ago

Cqz666 commented 1 year ago

Search before asking

Paimon version

0.4

Compute Engine

flink

Minimal reproduce step

I use Paimon to build an AppendOnly table to complete the writing of user logs to hdfs. This table has a large number of fields, about 300+, and it defines two level partitions, the first for date and the second for event. There are about 400 events, that is, we can see 400+ directories under the event partition directory.

When I first started the job, the writing went smoothly without any end-to-end delay, but when the job ran for about 30 minutes, the back pressure began to be generated, and it could not be eliminated at all. I suspect that checkpointing took too long. The long-term impact on the decline in data throughput, my checkpoint interval is 1 minute, and each checkpointing takes almost tens of seconds, but when backpressure occurs, the time-consuming will increase, and the end-to-end delay will begin to increase. Normal consumption levels cannot be restored.

During this period, I tried to increase the sink parallelism and memory resources, but it didn't work. I also increased the configuration of manifest.cache-size, but it didn't work either.

How can I go about solving the above performance issues?

What doesn't meet your expectations?

Writing to appendOnly table with a large number of partitions has performance issues

Anything else?

No response

Are you willing to submit a PR?

JingsongLi commented 1 year ago

Hi @Cqz666 , can you look at where the specific bottlenecks are? For example, now there is backpressure, after increasing concurrency is also all task in backpressure?

Have you seen TM's jstack when backpressing? Can you post it conveniently?

Cqz666 commented 1 year ago

@JingsongLi The following are thread stack snapshots captured at different moments under backpressure:


[arthas@362588]$ thread -n 5
"file-store-common-io-23" Id=9140 cpuUsage=64.63% deltaTime=132ms time=60038ms RUNNABLE
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:112)
    at org.apache.paimon.utils.ObjectsCache.read(ObjectsCache.java:62)
    at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:59)
    at org.apache.paimon.operation.AbstractFileStoreScan.readManifestFileMeta(AbstractFileStoreScan.java:332)
    at org.apache.paimon.operation.AbstractFileStoreScan.lambda$null$2(AbstractFileStoreScan.java:202)
    at org.apache.paimon.operation.AbstractFileStoreScan$$Lambda$1192/1022226566.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

"file-store-common-io-31" Id=14669 cpuUsage=64.57% deltaTime=132ms time=8151ms RUNNABLE
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:110)
    at org.apache.paimon.utils.ObjectsCache.read(ObjectsCache.java:62)
    at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:59)
    at org.apache.paimon.operation.AbstractFileStoreScan.readManifestFileMeta(AbstractFileStoreScan.java:332)
    at org.apache.paimon.operation.AbstractFileStoreScan.lambda$null$2(AbstractFileStoreScan.java:202)
    at org.apache.paimon.operation.AbstractFileStoreScan$$Lambda$1192/1022226566.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

"file-store-common-io-52" Id=12282 cpuUsage=64.53% deltaTime=132ms time=35003ms RUNNABLE
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.readRow(RowCompactedSerializer.java:624)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.access$900(RowCompactedSerializer.java:500)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$3d14a579$1(RowCompactedSerializer.java:301)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$955/1509512458.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$9dbec689$1(RowCompactedSerializer.java:313)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$953/1491723395.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.readRow(RowCompactedSerializer.java:624)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.access$900(RowCompactedSerializer.java:500)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$3d14a579$1(RowCompactedSerializer.java:301)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$955/1509512458.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$9dbec689$1(RowCompactedSerializer.java:313)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$953/1491723395.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:112)
    at org.apache.paimon.utils.ObjectsCache.read(ObjectsCache.java:62)
    at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:59)
    at org.apache.paimon.operation.AbstractFileStoreScan.readManifestFileMeta(AbstractFileStoreScan.java:332)
    at org.apache.paimon.operation.AbstractFileStoreScan.lambda$null$2(AbstractFileStoreScan.java:202)
    at org.apache.paimon.operation.AbstractFileStoreScan$$Lambda$1192/1022226566.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

"file-store-common-io-50" Id=1291 cpuUsage=64.4% deltaTime=132ms time=80164ms RUNNABLE
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.readBinary(RowCompactedSerializer.java:600)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.access$2000(RowCompactedSerializer.java:500)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$fe644207$3(RowCompactedSerializer.java:260)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$949/1603997226.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.readRow(RowCompactedSerializer.java:624)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.access$900(RowCompactedSerializer.java:500)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$3d14a579$1(RowCompactedSerializer.java:301)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$955/1509512458.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$9dbec689$1(RowCompactedSerializer.java:313)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$953/1491723395.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.readRow(RowCompactedSerializer.java:624)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$RowReader.access$900(RowCompactedSerializer.java:500)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$3d14a579$1(RowCompactedSerializer.java:301)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$955/1509512458.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.lambda$createFieldReader$9dbec689$1(RowCompactedSerializer.java:313)
    at org.apache.paimon.data.serializer.RowCompactedSerializer$$Lambda$953/1491723395.readField(Unknown Source)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:112)
    at org.apache.paimon.utils.ObjectsCache.read(ObjectsCache.java:62)
    at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:59)
    at org.apache.paimon.operation.AbstractFileStoreScan.readManifestFileMeta(AbstractFileStoreScan.java:332)
    at org.apache.paimon.operation.AbstractFileStoreScan.lambda$null$2(AbstractFileStoreScan.java:202)
    at org.apache.paimon.operation.AbstractFileStoreScan$$Lambda$1192/1022226566.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

"file-store-common-io-8" Id=3908 cpuUsage=64.28% deltaTime=132ms time=87541ms RUNNABLE
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:157)
    at org.apache.paimon.data.serializer.RowCompactedSerializer.deserialize(RowCompactedSerializer.java:112)
    at org.apache.paimon.utils.ObjectsCache.read(ObjectsCache.java:62)
    at org.apache.paimon.utils.ObjectsFile.read(ObjectsFile.java:59)
    at org.apache.paimon.operation.AbstractFileStoreScan.readManifestFileMeta(AbstractFileStoreScan.java:332)
    at org.apache.paimon.operation.AbstractFileStoreScan.lambda$null$2(AbstractFileStoreScan.java:202)
    at org.apache.paimon.operation.AbstractFileStoreScan$$Lambda$1192/1022226566.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
    at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
    at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

[arthas@362588]$ thread -n 5
"GC task thread#7 (ParallelGC)" [Internal] cpuUsage=16.18% deltaTime=32ms time=165325ms

"GC task thread#8 (ParallelGC)" [Internal] cpuUsage=16.17% deltaTime=32ms time=165710ms

"GC task thread#5 (ParallelGC)" [Internal] cpuUsage=16.16% deltaTime=32ms time=165538ms

"GC task thread#15 (ParallelGC)" [Internal] cpuUsage=16.07% deltaTime=32ms time=165414ms

"GC task thread#23 (ParallelGC)" [Internal] cpuUsage=16.07% deltaTime=32ms time=165154ms

[arthas@362588]$ thread -n 5
"Writer -> gamebox_event (11/12)#0" Id=115 cpuUsage=96.5% deltaTime=194ms time=780415ms RUNNABLE
    at java.util.zip.Inflater.inflateBytes(Native Method)
    at java.util.zip.Inflater.inflate(Inflater.java:259)
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
    at java.io.FilterInputStream.read(FilterInputStream.java:133)
    at com.ctc.wstx.io.BaseReader.readBytes(BaseReader.java:155)
    at com.ctc.wstx.io.UTF8Reader.loadMore(UTF8Reader.java:369)
    at com.ctc.wstx.io.UTF8Reader.read(UTF8Reader.java:112)
    at com.ctc.wstx.io.ReaderSource.readInto(ReaderSource.java:89)
    at com.ctc.wstx.io.BranchingReaderSource.readInto(BranchingReaderSource.java:57)
    at com.ctc.wstx.sr.StreamScanner.loadMore(StreamScanner.java:998)
    at com.ctc.wstx.sr.StreamScanner.getNext(StreamScanner.java:757)
    at com.ctc.wstx.sr.BasicStreamReader.nextFromTree(BasicStreamReader.java:2793)
    at com.ctc.wstx.sr.BasicStreamReader.next(BasicStreamReader.java:1123)
    at org.apache.hadoop.conf.Configuration$Parser.parseNext(Configuration.java:3347)
    at org.apache.hadoop.conf.Configuration$Parser.parse(Configuration.java:3141)
    at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3034)
    at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2995)
    at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2875)
    at org.apache.hadoop.conf.Configuration.get(Configuration.java:1223)
    at org.apache.paimon.shaded.org.apache.parquet.hadoop.util.ConfigurationUtil.getClassFromConfig(ConfigurationUtil.java:27)
    at org.apache.paimon.shaded.org.apache.parquet.crypto.EncryptionPropertiesFactory.loadFactory(EncryptionPropertiesFactory.java:62)
    at org.apache.paimon.shaded.org.apache.parquet.hadoop.ParquetOutputFormat.createEncryptionProperties(ParquetOutputFormat.java:548)
    at org.apache.paimon.shaded.org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:285)
    at org.apache.paimon.shaded.org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:675)
    at org.apache.paimon.format.parquet.writer.RowDataParquetBuilder.createWriter(RowDataParquetBuilder.java:81)
    at org.apache.paimon.format.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:52)
    at org.apache.paimon.io.SingleFileWriter.<init>(SingleFileWriter.java:70)
    at org.apache.paimon.io.StatsCollectingSingleFileWriter.<init>(StatsCollectingSingleFileWriter.java:56)
    at org.apache.paimon.io.RowDataFileWriter.<init>(RowDataFileWriter.java:55)
    at org.apache.paimon.io.RowDataRollingFileWriter.lambda$new$0(RowDataRollingFileWriter.java:46)
    at org.apache.paimon.io.RowDataRollingFileWriter$$Lambda$1088/384057978.get(Unknown Source)
    at org.apache.paimon.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:101)
    at org.apache.paimon.io.RollingFileWriter.write(RollingFileWriter.java:80)
    at org.apache.paimon.append.AppendOnlyWriter.write(AppendOnlyWriter.java:106)
    at org.apache.paimon.append.AppendOnlyWriter.write(AppendOnlyWriter.java:50)
    at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:93)
    at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:94)
    at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:70)
    at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.processElement(RowDataStoreWriteOperator.java:149)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$761/871268494.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task$$Lambda$998/1384549270.run(Unknown Source)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:745)

"Source: gamebox_event -> Map (11/12)#0" Id=113 cpuUsage=20.17% deltaTime=40ms time=273330ms TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6c5371b8
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6c5371b8
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task$$Lambda$998/1384549270.run(Unknown Source)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:745)

"C2 CompilerThread0" [Internal] cpuUsage=5.29% deltaTime=10ms time=15998ms

[arthas@362588]$ thread -n 5
"Writer -> gamebox_event (11/12)#0" Id=115 cpuUsage=32.15% deltaTime=64ms time=812009ms RUNNABLE
    at org.apache.paimon.shaded.org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.writeBytes(DictionaryValuesWriter.java:247)
    at org.apache.paimon.shaded.org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeBytes(FallbackValuesWriter.java:172)
    at org.apache.paimon.shaded.org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:239)
    at org.apache.paimon.shaded.org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:476)
    at org.apache.paimon.format.parquet.writer.ParquetRowDataWriter$StringWriter.writeString(ParquetRowDataWriter.java:269)
    at org.apache.paimon.format.parquet.writer.ParquetRowDataWriter$StringWriter.write(ParquetRowDataWriter.java:260)
    at org.apache.paimon.format.parquet.writer.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:510)
    at org.apache.paimon.format.parquet.writer.ParquetRowDataWriter.write(ParquetRowDataWriter.java:73)
    at org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:75)
    at org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:56)
    at org.apache.paimon.shaded.org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:138)
    at org.apache.paimon.shaded.org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:310)
    at org.apache.paimon.format.parquet.writer.ParquetBulkWriter.addElement(ParquetBulkWriter.java:47)
    at org.apache.paimon.io.SingleFileWriter.writeImpl(SingleFileWriter.java:102)
    at org.apache.paimon.io.StatsCollectingSingleFileWriter.write(StatsCollectingSingleFileWriter.java:65)
    at org.apache.paimon.io.RowDataFileWriter.write(RowDataFileWriter.java:63)
    at org.apache.paimon.io.RowDataFileWriter.write(RowDataFileWriter.java:41)
    at org.apache.paimon.io.RollingFileWriter.write(RollingFileWriter.java:83)
    at org.apache.paimon.append.AppendOnlyWriter.write(AppendOnlyWriter.java:106)
    at org.apache.paimon.append.AppendOnlyWriter.write(AppendOnlyWriter.java:50)
    at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:93)
    at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:94)
    at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:70)
    at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.processElement(RowDataStoreWriteOperator.java:149)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$761/871268494.runDefaultAction(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task$$Lambda$998/1384549270.run(Unknown Source)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:745)

"Source: gamebox_event -> Map (11/12)#0" Id=113 cpuUsage=27.15% deltaTime=54ms time=282500ms TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6c5371b8
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@6c5371b8
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
    at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:335)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task$$Lambda$998/1384549270.run(Unknown Source)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:745)

"Source Data Fetcher for Source: gamebox_event -> Map (11/12)#0" Id=122 cpuUsage=5.48% deltaTime=11ms time=109314ms WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@c23d8ff
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@c23d8ff
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.waitOnPut(FutureCompletingBlockingQueue.java:366)
    at org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.put(FutureCompletingBlockingQueue.java:203)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:64)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
JingsongLi commented 1 year ago

@Cqz666 really thanks!

Cqz666 commented 1 year ago

@JingsongLi When I only define the first-level partition, that is, the date partition, the job seems to run well, the checkpoint time is greatly shortened, and the backpressure phenomenon disappears.

So when there are a large number of partitions, it will cause high overhead for metadata generation or reading?

The second-level partition is to consider partition pruning to better filter data. Paimon has the concept of bucketing, setting 'bucket-key'='event', it seems that it can also read specific buckets to reduce data reading?

Is it necessary to set up a secondary partition? And will the framework level optimize the writing of a large number of partitions?

JingsongLi commented 1 year ago

Paimon has the concept of bucketing, setting 'bucket-key'='event', it seems that it can also read specific buckets to reduce data reading?

Yes, it is correct.

JingsongLi commented 1 year ago

And will the framework level optimize the writing of a large number of partitions?

Here we should optimize this situation! Don't worry.

Cqz666 commented 1 year ago

And will the framework level optimize the writing of a large number of partitions?

Here we should optimize this situation! Don't worry.

Happy to hear that, Thanks a lot !

JingsongLi commented 1 year ago

And will the framework level optimize the writing of a large number of partitions?

Here we should optimize this situation! Don't worry.

Happy to hear that, Thanks a lot !

Hi, what is your parallelism? how many buckets you define.

Cqz666 commented 1 year ago

And will the framework level optimize the writing of a large number of partitions?

Here we should optimize this situation! Don't worry.

Happy to hear that, Thanks a lot !

Hi, what is your parallelism? how many buckets you define.

At the beginning, I defined two layers of partitions, during which I tried different parallelism and buckets, such as 12, 24, 32, etc. The number of buckets and parallelism were the same, but neither of them could run stably.

Now, I define a layer of partitions, use a parallelism and bucket of 24, and the job works fine (no end-to-end delay, good resource consumption)

Cqz666 commented 1 year ago

@JingsongLi problem doesn't seem to be resolved.....orz image