apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT]How to improve the speed of Flink writing to hudi ? #8071

Open DavidZ1 opened 1 year ago

DavidZ1 commented 1 year ago

Tips before filing an issue

Describe the problem you faced

We currently have a car cloud business that consumes in real time through flink tasks and writes it into hudi. The source is kafka, and the messages of json in kafka are parsed. There are about 3600 fields for hudi table, 90% of which are of double type. However, our test found that flink writes to hudi at a faster speed It is relatively slow and cannot keep up with the speed of Kafka message production. We can't find the reason at present?

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context 1.Hudi config checkpoint.interval=300 checkpoint.timeout=600 compaction.max_memory=1024 payload.class.name=org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload compaction.delta_commits=20 compaction.trigger.strategy=num_or_time compaction.delta_seconds=3600 clean.policy=KEEP_LATEST_COMMITS clean.retain_commits=2 hoodie.bucket.index.num.buckets=40 archive.max_commits=50 archive.min_commits=40 table.type=MERGE_ON_READ hoodie.datasource.write.hive_style_partitioning=true index.type=BUCKET write.operation=upsert compaction.schedule.enabled=true compaction.async.enabled=true

2.hoodie.properties hoodie.table.precombine.field=acquire_timestamp hoodie.datasource.write.drop.partition.columns=false hoodie.table.partition.fields=pt,ht hoodie.table.type=MERGE_ON_READ hoodie.archivelog.folder=archived hoodie.table.cdc.enabled=false hoodie.compaction.payload.class=org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload hoodie.table.version=5 hoodie.timeline.layout.version=1 hoodie.table.recordkey.fields=vin,acquire_timestamp hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.name=ods_icv_can_hudi_temp hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5 hoodie.datasource.write.hive_style_partitioning=true

2.kafka 24 partitions, 200G messages per hour, each message is a JSON format, flink obtains about 3600 signal field data (double) from the JSON message

3.flink We used 20 tasks (each task 2 core and 8gb memory) or 48 tasks (each task 1 core and 4gb memory) for the flink task. After running for an hour, we found that the speed of consumption could not keep up with the speed of message production.

We use Tencent Cloud's streaming computing platform Oceanus: 1 computing CU includes 1 core CPU and 4GB memory. According to the difference between upstream and downstream and processing logic, the processing capacity of 1CU is about 5000 pieces/second to 50000 pieces/second. The computing performance of simple services is about 30,000 entries/second/core to 50,000 entries/second/core, and the computing performance of complex services is about 5,000 entries/second/core to 10,000 entries/second/core.

When writing cos at the same time, there will be many small files, the maximum can reach 4000+.

dfbc4123aea5dd747fcf8361c6fc6ce

d630dc34eb9ddea031d9491ea3baf0f

2bd0fcf31f39013f1fb026af40c2d2d

943e0fca84b4b8a8e655768d346ee11

74f946d68b7f708f9291e5c2694e483

446a0be0656eaaff33adf874c82b9c9

71917f2ac2f340b7e59a063a2523a4c

b7105471a5c0ab331413b82b5c334ed

d9ac98945743c600a31f4a4b950656e

c0e9e90bdd65747c5e128cdb95682ea

c9310ed1c2d076df66c25024f1ebb92

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 1 year ago

Thanks for the feedback, I see that you use the hashing index with MOR table type, does increasing the parallelism can improve the writing throughput here? How about we increase the write.tasks to 40

DavidZ1 commented 1 year ago

Yes,we try to increase the write.tasks to 40,can not improve the writing throughput.

danny0405 commented 1 year ago

Which step is the boottleneck, bucket stream write?

DavidZ1 commented 1 year ago

We are currently looking for the root cause, and the current guess is hudi bucket write. Because we are currently using the flink filesystem method to directly write parquet files to cos storage, the writing efficiency is very high, and the source is the same as hudi. The source parallelism is the same as the sink parallelism, which is 24.

danny0405 commented 1 year ago

we are currently using the flink filesystem method to directly write parquet files

Does that mean you have the filesystem source?

DavidZ1 commented 1 year ago

Yes, my colleague wrote the code for the filesystem of flink job .

DavidZ1 commented 1 year ago

Before we wrote about 3600 fields in the hudi table, but the writing performance did not improve. Today we did another test, reducing the number of fields in the hudi table. The number of fields in the hudi table was adjusted to 1000. The writing performance has been greatly improved, and the consumption speed of Kafka has been greatly improved, which can keep up with the production speed.

But we can't determine which step in hudi takes a long time to cause?

a8f6946d394bffbb089fdb3e5c7671b

75ffbe3b28a149367def9ceddb5cbbc

danny0405 commented 1 year ago

The fields SE/DE really takes time, especially for string type fields which has long strings.

DavidZ1 commented 1 year ago

I understand that the 3,600 fields in our hudi table are basically double types. For such a relatively large wide table, is there any optimization plan to improve the write throughput?

danny0405 commented 1 year ago

Do you wanna update? If not, you can just set the operation type as INSERT and table type as COPY_ON_WRITE.

DavidZ1 commented 1 year ago

No,We have tried the insert mode, combined with the mor and cow table formats, but the write throughput still cannot be improved.

voonhous commented 1 year ago

@DavidZ1 If you are running it under append-only [INSERT mode + COPY_ON_WRITE], you can improve write throughput by changing the default compression codec from gzip to SNAPPY / LZ4.

Snappy should be supported out of the box, but you will be trading off storage space for write throughput.

I would recommend LZ4 if you have the appropriate bundles included in your environment.

DavidZ1 commented 1 year ago

@DavidZ1 If you are running it under append-only [INSERT mode + COPY_ON_WRITE], you can improve write throughput by changing the default compression codec from gzip to SNAPPY / LZ4.

Snappy should be supported out of the box, but you will be trading off storage space for write throughput.

I would recommend LZ4 if you have the appropriate bundles included in your environment.

Thx, we use the flink jar method to write to hudi, but I can't find the relevant setting data compression format parameters in FlinkOptions class.

voonhous commented 1 year ago

You are right that there are no Flink configs for it. What i did in the past was configure it using: hoodie.parquet.compression.codec. You can verify if whether the configuration has taken effect by checking your logs.

Reference: https://hudi.apache.org/docs/configurations/#hoodieparquetcompressioncodec

DavidZ1 commented 1 year ago

Thx, We switched the encoding format and started to use the lz4 format to compress, but found that the platform does not support it, and the exception is as follows: java.io.IOException: java.io.IOException: Exception happened when bulk insert. at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:117) at org.apache.hudi.sink.append.AppendWriteFunction.processElement(AppendWriteFunction.java:86) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.IOException: Exception happened when bulk insert. at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:115) ... 15 more Caused by: java.lang.RuntimeException: native lz4 library not available at org.apache.hadoop.io.compress.Lz4Codec.getCompressorType(Lz4Codec.java:125) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:146) at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:208) at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:191) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:296) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:228) at org.apache.hudi.io.storage.row.HoodieRowDataParquetWriter.<init>(HoodieRowDataParquetWriter.java:45) at org.apache.hudi.io.storage.row.HoodieRowDataFileWriterFactory.newParquetInternalRowFileWriter(HoodieRowDataFileWriterFactory.java:79) at org.apache.hudi.io.storage.row.HoodieRowDataFileWriterFactory.getRowDataFileWriter(HoodieRowDataFileWriterFactory.java:55) at org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle.createNewFileWriter(HoodieRowDataCreateHandle.java:211) at org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle.<init>(HoodieRowDataCreateHandle.java:103) at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.getRowCreateHandle(BulkInsertWriterHelper.java:134) at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:110) ... 15 more

Then we switched to the snappy format, and the writing performance did improve to a certain extent. However, due to the Tencent Cloud COS we used for storage, there was a list frequency control problem in cow writing, so the overall performance could not be greatly improved,and the exception is as follows:

org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57) at org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:276) at org.apache.hudi.sink.append.AppendWriteFunction.initWriterHelper(AppendWriteFunction.java:110) at org.apache.hudi.sink.append.AppendWriteFunction.processElement(AppendWriteFunction.java:84) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:750)

dae27e39cb52840193712642ec82a54

danny0405 commented 1 year ago

Sorry for late reply, did you already use the append and it is still slow?

voonhous commented 1 year ago

Sorry for late reply, did you already use the append and it is still slow?

Yeap, judging from the stack trace, he is running his job under append only mode.

org.apache.hudi.sink.append.AppendWriteFunction.initWriterHelper(AppendWriteFunction.java:110

Then we switched to the snappy format, and the writing performance did improve to a certain extent. However, due to the Tencent Cloud COS we used for storage, there was a list frequency control problem in cow writing, so the overall performance could not be greatly improved,and the exception is as follows:

This feels like a COS issue. @DavidZ1 you mentioned there was a list frequency control problem in cow writing. So, it's spending too much time listing files? IIUC, your job might be writing too many parquet files while flushing? I am not very familiar with COS, so I am taking a shot in the dark here, looking at your configurations, the default write.parquet.max.file.size is used, which is 120MB by default.

Perhaps, you could try increasing this so that lesser parquet files are written? Do note that your parquet sizes will get larger, and not sure how this will affect your read performance too. Given that this is an append only job, there should be no performance penalties incurred on write performance.

DavidZ1 commented 1 year ago

Yes, Tencent cos currently has the problem of reading file list frequency control. Our business is still streaming signal type data transmitted from vehicles, so there will be a problem of delayed data storage and delayed reporting, resulting in. Of course, we add data processing logic for delayed reporting. The delayed reported data of the day will be written to the corresponding partition, and the data of the non-day will be written to a separate delayed partition of the day. I don't know if this delayed reporting of data processing will increase the pressure on file processing, causing the cow writing process to take more time?

In addition, the bucket index and bucket number I use are set to 40, and I only keep the latest version, so there should be a maximum of 80 files in one hour partition, and the total number of files in one day's partition is 1920. In this case, I shouldn't have too many small files created immediately. Regarding increasing the size of the parquet file, I will test it.

Also ask a question, I see the default cos of Hudi source code (https://github.com/apache/hudi/blob/release-0.13.0/hudi-common/src/main/java/org/apache/hudi/common/ fs/StorageSchemes.java) does not support the append mode, but the cow format does support the append mode, so can it be simply understood that the implementation methods of Hudi mor and cow append are different?

DavidZ1 commented 1 year ago

I adjusted thewrite.parquet.max.file.size parameter to 3000, and the flink job started to run normally, but after several checkpoints, it failed. I checked the size of the written file, and the maximum was 80MB, 1MB minimum. The exception is still the same as before, as follows: 2023-03-06 20:51:56.793 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 3 of job 00000000000000000000000000000000 expired before completing. 2023-03-06 20:51:56.795 [jobmanager-future-thread-5] INFO com.tencent.cloud.tstream.flink.OceanusCheckpointListener [] - Begin to post checkpoint failed event 2023-03-06 20:51:56.851 [jobmanager-future-thread-5] INFO com.tencent.cloud.tstream.flink.OceanusCheckpointListener [] - Watchdog response: HttpResponseProxy{HTTP/1.1 200 OK [Content-Type: text/html;charset=UTF-8, Content-Length: 48, Server: Jetty(8.1.19.v20160209)] ResponseEntityProxy{[Content-Type: text/html;charset=UTF-8,Content-Length: 48,Chunked: false]}} 2023-03-06 20:51:57.128 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 4 (type=CHECKPOINT) @ 1678107116797 for job 00000000000000000000000000000000. 2023-03-06 20:51:57.128 [pool-18-thread-1] INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [taking checkpoint 4] success! 2023-03-06 20:52:42.977 [flink-akka.actor.default-dispatcher-24] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: IvcVhrCan Source From Kafka with no watermarks -> hoodie_append_write: ods_icv_can_hudi_temp -> Sink: dummy (17/24) (adf8b18b3e47d3c35adcb79b1a953bb4) switched from RUNNING to FAILED on cql-ncgz4z8e-582618-taskmanager-1-1 @xxxxx (dataPort=xxx). org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57) ~[?:?] at org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:276) ~[?:?] at org.apache.hudi.sink.append.AppendWriteFunction.initWriterHelper(AppendWriteFunction.java:110) ~[?:?] at org.apache.hudi.sink.append.AppendWriteFunction.processElement(AppendWriteFunction.java:84) ~[?:?] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) ~[?:?] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) ~[?:?] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332] 2023-03-06 20:52:42.992 [flink-akka.actor.default-dispatcher-18] INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 00000000000000000000000000000000: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=23}] 2023-03-06 20:52:42.994 [flink-akka.actor.default-dispatcher-24] WARN org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Reset the event for task [16] org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57) ~[?:?] at org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:276) ~[?:?] at org.apache.hudi.sink.append.AppendWriteFunction.initWriterHelper(AppendWriteFunction.java:110) ~[?:?] at org.apache.hudi.sink.append.AppendWriteFunction.processElement(AppendWriteFunction.java:84) ~[?:?] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) ~[?:?] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) ~[?:?] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128) ~[flink-table-blink_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:305) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.11-1.13.6.jar:1.13.6] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332] 1678149169351

Number of files in a single partition 1678149414548

danny0405 commented 1 year ago

What version of Hudi did you use, previously we resolved a bug for append mode partial failover: https://github.com/apache/hudi/pull/7208/files,

The phenomenon is that when partial failover happens, the task would hang out there until the timeout happens. The bug was not resolved until release 0.12.2.

DavidZ1 commented 1 year ago

We are using is 0.13.0 for version of Hudi .

danny0405 commented 1 year ago

Is there any preceding error throws out before the Timeout(xxx) exceptions emerged ?

DavidZ1 commented 1 year ago

I checked the tasks running log, but did not see any other abnormal information.

danny0405 commented 1 year ago

What about your avg checkpoint end-to-end time?

DavidZ1 commented 1 year ago

About five min to ck.

image

hbgstc123 commented 1 year ago

org.apache.hudi.exception.HoodieException: Timeout(601000ms) while waiting for instant initialize.
Is there a ckp timeout before this error. Maybe caused by ckp timtout

danny0405 commented 1 year ago

You timeout threshold is 10min, it should be fine, why it task 5min to ckp, the append mode only flush new Parquets.

DavidZ1 commented 1 year ago

Isn't it normal for streaming data to ck for 5 minutes? Any better optimization suggestions for ck? The bucket index I use theoretically only creates 50 files each time. I don't quite understand what you said about the append mode only flush new Parquets.

danny0405 commented 1 year ago

Thanks, for COW table with insert operation, Flink does not use any index, so the bucket index does not work here, the write throughput should be high, and for UPSERTs with bucket index, if you use the COW, yes, the performance is bad because the whole table/partition is almot rewritten each ckp.

DavidZ1 commented 1 year ago

Thanks ,we use insert + COW + bucket Index,so the write throughput is not good because the upserts the index for the cow?If we change the index ,has the performance will be fine ? Or there are other factors affecting writing?

danny0405 commented 1 year ago

image From the DAG of this diagram, it seems you are using the MOR table type + bucket index, you also enable the async compaction, for append mode we means to use the COW table + Insert operation type. Did you set up your write operation trrough write.operation as insert ?

DavidZ1 commented 1 year ago

YES, we have adjusted the writing mode of the table, whether it is MOR or COW table, we have tried insert/upsert mode, but the overall performance cannot be improved.for append mode we means to use the COW table + Insert operation type then the COW table + upsert operation type to use that mode ?

danny0405 commented 1 year ago

Maybe you can just take a reference of this doc: https://www.yuque.com/yuzhao-my9fz/kb/flqll8?#cJu7y

DavidZ1 commented 1 year ago

Thx,I will read later.

Vsevolod3 commented 1 year ago

We're having a similar issue with write performance. The Hudi stream_write task takes between 8 and 10 minutes for a MoR table and between 9 and 11 minutes for a CoW table to write 600K records. This performance is slow even for when our S3 destination is completely empty and the corresponding Glue Catalog table doesn't exist. I'm including the details of our testing below:

Test parameters:

CoW Testing

CoW results (sample):

CoW options being set on HoodiePipeline.Builder:

'connector' = 'hudi',
  'index.type' = 'BLOOM',
  'compaction.schedule.enabled' = 'true',
  'clustering.plan.strategy.sort.columns' = 'acct_id',
  'compaction.delta_seconds' = '720',
  'clustering.delta_commits' = '4',
  'clustering.plan.strategy.small.file.limit' = '600',
  'compaction.async.enabled' = 'true',
  'compaction.max_memory' = '100',
  'hoodie.parquet.max.file.size' = '125829120',
  'read.streaming.enabled' = 'false',
  'path' = 's3://*****/*/account/',
  'hoodie.logfile.max.size' = '1073741824',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hoodie.parquet.compression.ratio' = '0.1',
  'hoodie.parquet.small.file.limit' = '104857600',
  'compaction.tasks' = '4',
  'precombine.field' = 'update_ts',
  'write.task.max.size' = '1024.0',
  'hoodie.parquet.compression.codec' = 'snappy',
  'compaction.delta_commits' = '3',
  'clustering.tasks' = '4',
  'compaction.trigger.strategy' = 'num_or_time',
  'read.tasks' = '4',
  'compaction.timeout.seconds' = '1200',
  'clustering.async.enabled' = 'false',
  'table.type' = 'COPY_ON_WRITE',
  'metadata.compaction.delta_commits' = '10',
  'clustering.plan.strategy.max.num.groups' = '30',
  'write.tasks' = '6',
  'clustering.schedule.enabled' = 'false',
  'hoodie.logfile.data.block.format' = 'avro',
  'write.batch.size' = '256.0',
  'write.sort.memory' = '128'

CoW DAG

Attached: CoW_DAG

MoR Testing

MoR results (sample):

MoR options being set on HoodiePipeline.Builder:

'connector' = 'hudi',
  'index.type' = 'BLOOM',
  'compaction.schedule.enabled' = 'true',
  'clustering.plan.strategy.sort.columns' = 'acct_id',
  'compaction.delta_seconds' = '720',
  'clustering.delta_commits' = '4',
  'clustering.plan.strategy.small.file.limit' = '600',
  'compaction.async.enabled' = 'true',
  'compaction.max_memory' = '100',
  'hoodie.parquet.max.file.size' = '125829120',
  'read.streaming.enabled' = 'false',
  'path' = 's3://*****/*/account/',
  'hoodie.logfile.max.size' = '1073741824',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hoodie.parquet.compression.ratio' = '0.1',
  'hoodie.parquet.small.file.limit' = '104857600',
  'compaction.tasks' = '4',
  'precombine.field' = 'update_ts',
  'write.task.max.size' = '1024.0',
  'hoodie.parquet.compression.codec' = 'snappy',
  'compaction.delta_commits' = '3',
  'clustering.tasks' = '4',
  'compaction.trigger.strategy' = 'num_or_time',
  'read.tasks' = '4',
  'compaction.timeout.seconds' = '1200',
  'clustering.async.enabled' = 'false',
  'table.type' = 'MERGE_ON_READ',
  'metadata.compaction.delta_commits' = '10',
  'clustering.plan.strategy.max.num.groups' = '30',
  'write.tasks' = '6',
  'clustering.schedule.enabled' = 'false',
  'hoodie.logfile.data.block.format' = 'avro',
  'write.batch.size' = '256.0',
  'write.sort.memory' = '128'

MoR DAG

Attached: MoR_DAG

danny0405 commented 1 year ago

The throughput would be optimized if you use the bucket index instead, BTW, what kind of state-backend did you use, did you enable the incremental checkpointing?

Vsevolod3 commented 1 year ago

Thanks for your response. Here are our properties for state-backend (from flink-conf on EMR), and as you can see, incremental checkpointing is enabled:

"state.backend": "rocksdb",
"state.backend.incremental": "true",
"state.checkpoint-storage": "filesystem",
"state.checkpoints.dir": "s3://*****/flink-checkpoints",

Do you see any potential problems with these configs?

In relation to using the bucket index, are these the correct properties to try? I will do some testing with the defaults, but if you have any suggestions for improving these, please let me know.

hoodie.index.type=BUCKET
hoodie.bucket.index.hash.field=<some field, probably PK>
hoodie.bucket.index.num.buckets=256
hoodie.index.bucket.engine=SIMPLE

Thanks in advance.

Vsevolod3 commented 1 year ago

@danny0405 : to update, I did try the bucket index, and the performance is still in the 9-10 minute range. Here are the timings for the tasks:

Here are the Hudi properties submitted to the sink builder:

  'connector' = 'hudi',
  'compaction.schedule.enabled' = 'true',
  'hoodie.index.bucket.engine' = 'SIMPLE',
  'hoodie.index.type' = 'BUCKET',
  'clustering.plan.strategy.sort.columns' = 'acct_id',
  'write.bucket_assign.tasks' = '6',
  'compaction.delta_seconds' = '720',
  'clustering.delta_commits' = '4',
  'clustering.plan.strategy.small.file.limit' = '600',
  'compaction.async.enabled' = 'true',
  'compaction.max_memory' = '100',
  'hoodie.parquet.max.file.size' = '125829120',
  'read.streaming.enabled' = 'false',
  'path' = 's3://*****/*/account/',
  'hoodie.logfile.max.size' = '1073741824',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hoodie.parquet.compression.ratio' = '0.1',
  'hoodie.parquet.small.file.limit' = '104857600',
  'hoodie.bucket.index.hash.field' = 'acct_id',
  'compaction.tasks' = '4',
  'precombine.field' = 'update_ts',
  'write.task.max.size' = '1024.0',
  'hoodie.parquet.compression.codec' = 'snappy',
  'compaction.delta_commits' = '3',
  'clustering.tasks' = '4',
  'compaction.trigger.strategy' = 'num_or_time',
  'hoodie.bucket.index.num.buckets' = '256',
  'read.tasks' = '4',
  'compaction.timeout.seconds' = '1200',
  'clustering.async.enabled' = 'false',
  'table.type' = 'COPY_ON_WRITE',
  'metadata.compaction.delta_commits' = '10',
  'clustering.plan.strategy.max.num.groups' = '30',
  'write.tasks' = '6',
  'clustering.schedule.enabled' = 'false',
  'hoodie.logfile.data.block.format' = 'avro',
  'write.batch.size' = '256.0',
  'write.sort.memory' = '128'

Also, the metadata on the parquet files written by Hudi still has hoodie_bloom_filter_type_code=DYNAMIC_V0 and org.apache.hudi.bloomfilter in it, like it's still using BLOOM index. Is this expected?

danny0405 commented 1 year ago

Guess you are a chinese, here is a chinese doc for bucket index: https://www.yuque.com/yuzhao-my9fz/kb/flqll8?#g3rZq

Vsevolod3 commented 1 year ago

Thanks for the link, Danny. I don't speak Chinese, so I'm not sure what most of it says. I will try to apply auto-translate to that page. In the meantime, do you have anything similar that's in English?

danny0405 commented 1 year ago

Sorry, let me illustrate the options here, basically there are about 2 options that you need to take care for bucket index:

  1. index.type set it as BUCKET;
  2. hoodie.bucket.index.num.buckets: the buckets number, by default 4,

We also have a page for Flink in english on the hudi website: https://hudi.apache.org/docs/hoodie_deltastreamer#bucket-index

Vsevolod3 commented 1 year ago

Thank you, Danny. Yes, I got the best performance by using BUCKET index with non-nested, numeric, non-hive partitions. We were actually able to get it to perform in under 3 minutes for the stream_write task when we picked a partitioning field that resulted in fewer partitions (14 partitions) compared to our previous tests (> 90 partitions).

It seems the number of partitions Hudi has to manage has a very large impact on performance. Would you be able to share any documents or blog posts that explain partitioning for performance further? I read through most of the documents under Concepts on the Hudi website (e.g. https://hudi.apache.org/docs/next/indexing), but didn't find a lot dealing in depth with partitioning strategies.

danny0405 commented 1 year ago

partitioning strategies

For partitioning strategies, are you referring to the data buckets (or FileGroup in Hudi's notion) or just regular directories like Hive, if it is former, here is the doc about Hudi file layput: https://hudi.apache.org/docs/file_layouts.

For cow table, each update of the existing file group would trigger a re-write of the whole parquet file, so there is somehow large write amplification.

For mor table, Hudi just append the new records first, and the data compation task is defered as an async execution.

HuangZhenQiu commented 1 month ago

@danny0405 Do we have some best practices of (COW and MOR ) for Flink ingestion to Hudi?

danny0405 commented 1 month ago
  1. bucket index is always reconmended if cross-partition update is not needed or the resource is crutial for users.
  2. mor is preferrable than cow for streaming ingestion.
  3. datetime partitioning for table is suggested(to relive the pressure of the driver memory usage)