apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT]How to resolve small file? #8651

Open c-f-cooper opened 1 year ago

c-f-cooper commented 1 year ago

Tips before filing an issue

Describe the problem you faced

we ingest log data into hudi,we use 'cow + insert' which means append mode.we found there are some small file on hdfs,so we use clustering to resolve the problem,but no effect!

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

danny0405 commented 1 year ago

What kind of clustering are you using, online or offline? For no effect do you mean no bigger Parquets are generated and replacing the existing files?

ad1happy2go commented 1 year ago

@c-f-cooper Are you using Insert or Bulk Insert. Can you please share the clustering command and table configs you are using.

c-f-cooper commented 1 year ago

What kind of clustering are you using, online or offline? For no effect do you mean no bigger Parquets are generated and replacing the existing files?

we use online async clustering,there are alse some small file,and no bigger parquet file,I found the replacecommit file in the .hoodie directory.

c-f-cooper commented 1 year ago

@c-f-cooper Are you using Insert or Bulk Insert. Can you please share the clustering command and table configs you are using.

we use cow+insert mode,beside the clustering config,onther config use default config.

danny0405 commented 1 year ago

Do you do not enable the async clustering right? We have inline clustering, async clustering, and offline clustering, which one are you using?

c-f-cooper commented 1 year ago

Do you do not enable the async clustering right? We have inline clustering, async clustering, and offline clustering, which one are you using?

we use async clustering,we use flink datastream api,we use two confin items,setclustering.schedule.enabledand clustering.async.enabled true.

danny0405 commented 1 year ago

Does the async clustering happens in normal progress? Can you past the DAG of the running job?

c-f-cooper commented 1 year ago

yes,It has come into effect,the clustering task appears in DAG,and the replace_commit appears in .hoodie directory.

c-f-cooper commented 1 year ago

image

023D4646-7D12-4606-8188-0F1A05DE47C5_1_102_o

I found that the async cluster shedule done,but not excute @danny0405

danny0405 commented 1 year ago

Can you add more parallelism to the clustering task?

c-f-cooper commented 1 year ago

Can you add more parallelism to the clustering task?

when i add parallelism to 16,the excutor of task excuted.

danny0405 commented 1 year ago

Good news, maybe it is because there are too many pending clustering tasks blocked in the queue.

ad1happy2go commented 1 year ago

@c-f-cooper Were you able to resolve this issue, was it same one as @danny0405 mentioned.

soumilshah1995 commented 1 year ago

Hey there try adding this config i have tried and works for me

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

image

Hope this helps

c-f-cooper commented 1 year ago

Hey there try adding this config i have tried and works for me

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

image

Hope this helps

The configs belongs to file sizing,is it effective in clustering?

danny0405 commented 1 year ago

The file sizing should be still valid.

c-f-cooper commented 1 year ago

The file sizing should be still valid.

I try it

jlloh commented 1 year ago

Sorry to hop on the thread, but @danny0405 I'm using a similar setup to OP (Flink + Async Clustering + COW + Insert), but writing to S3 instead of HDFS. I'm also getting small files, but I realised that the number of files written basically correspond to the environment parallelism of the flink environment.

I tried tweaking write.bucket_assign.tasks but it doesn't seem to work, also tried tweaking the parquet size configurations, but doesn't seem to take effect. Flink still seems to write out as many files as the parallelism, in my case parallelism is 30, so it's writing out 30 files of ~5MB each.

I guess it async clustering could solve this later on, but i saw this line in the documentation:

NOTE: Clustering can only be scheduled for tables / partitions not receiving any concurrent updates. In the future, concurrent updates use-case will be supported as well.

Does this mean that if a partition is currently being written to (e.g. I do a daily partition), the clustering task won't be able to run to cluster the files until after the day has passed and the writer stops writing to the partition? I.e. clustering will be one day delayed.

My dag for reference below: image

My table configuration: image

danny0405 commented 1 year ago

I guess it async clustering could solve this later on, but i saw this line in the documentation:

It's a mistake, you can schedule the clustering plan directly in the separate clustering job.

Another choice is to schedule the plan in the writing job and executes it in the separate job.

vkhoroshko commented 8 months ago

Hello Guys,

I believe I face the similar issue.

I create the CoW table with append only mode and expect auto file sizing feature to work (as depicted in the documentation). I specifically set small file limit and max file size low (for testing):


CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
WITH (
  'connector' = 'hudi',
  'path' = 'file:///opt/flink/hudi',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation'='insert',
  'parquet.small.file.limit'='500000',
  'parquet.max.file.size'='1000000'
);

But for every insert (and commit) - a separate parquet file is created and they are never merged.