apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT] Memory Exception when building BuildProfile #7565

Open jomach opened 1 year ago

jomach commented 1 year ago

Tips before filing an issue

Describe the problem you faced

We are running Apache Hudi on a AWS Glue with 3 Nodes G.2X and reading relative small amount of data. The job ends up failing on org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:191) We are running 0.10.0 and spark 3.1.1

Screenshot 2022-12-27 at 13 01 44 Screenshot 2022-12-27 at 12 50 26

If we increase the number of workers to 6 then the job usually works. I think it has todo with skew keys, but I'm new to this project

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Stacktrace

ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

I think the issue is here:

HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
WorkloadStat globalStat = new WorkloadStat();

Map<Pair<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
    .stream()
    .map(record -> Pair.of(
        Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
    .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));

We do a collect into memory, if the keys are very long it is an issue. What if we implement a WorkfloadStat that is a Collector ? + that the hashmap from the WorkloadStat do not use strings as keys

yihua commented 1 year ago

Hi @jomach Thanks for raising the issue. If you haven't, please check out the Tuning Guide for writing data to a Hudi table through a Spark job.

We'll revisit the logic of the code snippet you pasted. Usually, the number of combinations of partition path and record location (parquet file path) should be limited.

xushiyan commented 1 year ago
inputRecords
        .mapToPair(record -> Pair.of(
            new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
        .countByKey();

you should refer to org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor#buildProfile which is used by spark.

I think this is more of a spark job tuning issue, where parallelism and executor memory should be tuned.

Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Any further info on this?

jomach commented 1 year ago

The executors are being killed due to memory exceptions. (OOM)