apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.43k stars 2.42k forks source link

[SUPPORT] Flink MOR only creates log files #9663

Open jmnatzaganian opened 1 year ago

jmnatzaganian commented 1 year ago

Describe the problem you faced

When using Flink with an MOR table only log files are created. This is different from Spark, which only uses log files when changes to a file group occur. Log files are not optimized for large payloads which results in a costly performance hit when flushing the data. The end result is a pipeline which quickly buffers data, blocks while converting the data, and then flushes. For base files, the heavy lifting is passed to the parquet writer which automatically addresses these complications by incrementally processing data at the row group level. In that situation, the expectation is a relatively flat CPU usage, since serialization and compression is occurring in parallel with reads.

Over-provisioning helps to a degree, but is limited to the size of the batch size. Reducing the batch size helps significantly but results in too many small files. Compaction can be used to help with this, but this adds a large cost in the I/O to S3 and compaction job.

To Reproduce

See the attached file, Demo.java, from @kzdravkov. This is a self-contained Flink example illustrating the behavior.

Expected behavior

See the attached file hudi_spark_mor.py. This is a self-contained PySpark example that shows the behavior that is expected. In short - whenever inserts occur it's expected that a base file will be created. It's additionally expected that the file bin packing logic will occur as appropriate.

Environment Description

Flink is running in k8s. Data is in S3. Catalog uses Glue. For the purposes of this issue, local is sufficient for demonstration.

Additional context

@kzdravkov initially started this discussion in slack here with @danny0405 and others. The thread has some additional details about our use case, but I'll include some important notes below:

The data volume is modest at 50-100k rps with 100 mb/s uncompressed data. We are trying to migrate an existing Flink job that is a basic parquet table to Hudi. The job is insert heavy, but does have upserts and as such needs to be set to upsert. Our data model requires a global index to ensure we don't have duplicates (record --> partition mapping can change). This has been mitigated by using a pseudo-global index by setting the FLINK_STATE index to address most of the duplicates. All of the table services are done independently via Spark (currently Glue jobs).

Given that this is blocking a rollout, we are planning to test our job with COW to see if it's good enough, and then migrate to MOR once this is addressed.

For anyone curious about the log file overhead, please see the above slack thread. I believe this is also an issue, but assuming log blocks are kept small then it's minor and likely not worth the effort. I'd like to exclude that from this issue, since it's a separate and secondary issue.

danny0405 commented 1 year ago

@yihua Do you have intreast to join this discussion to investigate for more efficient log append wrtiers ? Currently the log file encoding is not that efficient if ther user switchs to the parquet bytes log blocks.