apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.24k stars 2.39k forks source link

Apache Hudi Auto-Size During Writes is not Working for Flink SQL #10622

Closed vkhoroshko closed 5 months ago

vkhoroshko commented 5 months ago

To Reproduce

Steps to reproduce the behavior:

  1. Use Flink SQL with the file below.

Current behavior A separate parquet file is produced with every Flink commit (during checkpointing)

Expected behavior Data is appended to existing parquet file(s) until max size threshold is met.

A clear and concise description of what you expected to happen.

Environment Description

Additional context The expectation (as depicted in Apache Hudi docs - https://hudi.apache.org/docs/file_sizing#auto-sizing-during-writes) is that with every flink commit (every minute) - a set of records will be accumulated and written to one of existing parquet files until parquet file max size threshold is met (in the example below is 5MB). However, what happens is that every commit results in a separate parquet file (~400KB size) which are accumulated and are never merged. Please, help.

SQL file:

SET 'parallelism.default' = '1';
SET 'execution.checkpointing.interval' = '1m';

CREATE TABLE datagen
(
    id   INT NOT NULL PRIMARY KEY NOT ENFORCED,
    data STRING
) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '5'
);

CREATE TABLE hudi_tbl
(
    id   INT NOT NULL PRIMARY KEY NOT ENFORCED,
    data STRING
) WITH (
      'connector' = 'hudi',
      'path' = 'file:///opt/hudi',
      'table.type' = 'COPY_ON_WRITE',
      'write.parquet.block.size' = '1',
      'write.operation' = 'insert',
      'write.parquet.max.file.size' = '5'
);

INSERT INTO hudi_tbl SELECT * from datagen;
danny0405 commented 5 months ago

That is because you are using the append mode, the append mode relies on the async clustering for small files merging. You can switch to upsert operation for the tests too, the upsert will merge the files during write.

vkhoroshko commented 5 months ago

That is because you are using the append mode, the append mode relies on the async clustering for small files merging. You can switch to upsert operation for the tests too, the upsert will merge the files during write.

Thank you. Looks like it's working fine. However, is it mentioned in documentation anywhere? All that I see is the note - the bulk_insert write operation does not have auto-sizing capabilities during ingestion

Another question, by adding the following property I was able to achieve the same for insert operation type:

'write.insert.cluster' = 'true',

This property above is not greatly documented either, I found it by digging in source code..

Is there any difference really? Or overall recommendation is to use async clustering?

Thanks in advance

danny0405 commented 5 months ago

'write.insert.cluster' = 'true' is actually inline clustering, that means it will do the clustering on each write, so the write throughput is low comparing to the async clustering.

Sorry it is not well documented.

ad1happy2go commented 5 months ago

Created a tracking JIRA to fix doc - https://issues.apache.org/jira/browse/HUDI-7396

Thanks @dannyhchen @vkhoroshko . Closing this.