apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] OOM during a Sync/Async clean operation #8199

Open gudladona opened 1 year ago

gudladona commented 1 year ago

OOM during a Sync or Async clean operation

ENV:

Hudi version: 0.11.1 Java Version 1.8 Spark Version: 3.1.2 EMR version: 6.4 Clean Policy: KEEP_LATEST_BY_HOURS -- 24 hours(default) Clean Parallelism: 200 (default) Metadata: disabled

We have been experiencing consistent OOM errors when running Hudi delta-streamer job in continuous mode. The oom occurs during the "Generating list of file slices to be cleaned" phase. The image below shows the heap growth during the clean operation. The heap growth particularly happens during 2 API calls from the executors getReplacedFileGroupsBefore and getAllFileGroups on the file system view.

image (7)

Also, we do have jfr files that contain memory profiles during a failed clean operation, Github does not allow us to attach them to the issue.

We also tried the following setting hoodie.embed.timeline.server.async: true which seems to have reduced the heap usage and the clean succeeds. This seems to happen due to the single threaded nature of the async executor. Using this setting we notice the following heap usage. However, the clean operation is slightly less performant. We will continue to use this setting for now, and post any new observations on this ticket.

image

Flame Graph for the Async clean with async timeline server

image

Flame Graph for Async clean with sync timeline server

image

Expected behavior

A clean operation should not fail with OOM

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace


  at java.lang.OutOfMemoryError.<init>()V (OutOfMemoryError.java:48)
  at java.lang.StringCoding.encode(Ljava/nio/charset/Charset;[CII)[B (StringCoding.java:350)
  at java.lang.String.getBytes(Ljava/nio/charset/Charset;)[B (String.java:941)
  at io.javalin.Context.result(Ljava/lang/String;)Lio/javalin/Context; (Context.kt:364)
  at org.apache.hudi.timeline.service.RequestHandler.writeValueAsStringSync(Lio/javalin/Context;Ljava/lang/Object;)V (RequestHandler.java:210)
  at org.apache.hudi.timeline.service.RequestHandler.writeValueAsString(Lio/javalin/Context;Ljava/lang/Object;)V (RequestHandler.java:176)
  at org.apache.hudi.timeline.service.RequestHandler.lambda$registerFileSlicesAPI$18(Lio/javalin/Context;)V (RequestHandler.java:384)
  at org.apache.hudi.timeline.service.RequestHandler$$Lambda$2356.handle(Lio/javalin/Context;)V (Unknown Source)
  at org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(Lio/javalin/Context;)V (RequestHandler.java:501)
  at io.javalin.security.SecurityUtil.noopAccessManager(Lio/javalin/Handler;Lio/javalin/Context;Ljava/util/Set;)V (SecurityUtil.kt:22)
  at io.javalin.Javalin$$Lambda$2336.manage(Lio/javalin/Handler;Lio/javalin/Context;Ljava/util/Set;)V (Unknown Source)
  at io.javalin.Javalin.lambda$addHandler$0(Lio/javalin/Handler;Ljava/util/Set;Lio/javalin/Context;)V (Javalin.java:606)
  at io.javalin.Javalin$$Lambda$2340.handle(Lio/javalin/Context;)V (Unknown Source)
  at io.javalin.core.JavalinServlet$service$2$1.invoke()V (JavalinServlet.kt:46)
  at io.javalin.core.JavalinServlet$service$2$1.invoke()Ljava/lang/Object; (JavalinServlet.kt:17)
  at io.javalin.core.JavalinServlet$service$1.invoke(Lkotlin/jvm/functions/Function0;)V (JavalinServlet.kt:143)
  at io.javalin.core.JavalinServlet$service$2.invoke()V (JavalinServlet.kt:41)
  at io.javalin.core.JavalinServlet.service(Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (JavalinServlet.kt:107)
  at io.javalin.core.util.JettyServerUtil$initialize$httpHandler$1.doHandle(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (JettyServerUtil.kt:72)
  at org.apache.hudi.org.eclipse.jetty.server.handler.ScopedHandler.nextScope(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (ScopedHandler.java:203)
  at org.apache.hudi.org.eclipse.jetty.servlet.ServletHandler.doScope(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (ServletHandler.java:480)
  at org.apache.hudi.org.eclipse.jetty.server.session.SessionHandler.doScope(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (SessionHandler.java:1668)
  at org.apache.hudi.org.eclipse.jetty.server.handler.ScopedHandler.nextScope(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (ScopedHandler.java:201)
  at org.apache.hudi.org.eclipse.jetty.server.handler.ContextHandler.doScope(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (ContextHandler.java:1247)
  at org.apache.hudi.org.eclipse.jetty.server.handler.ScopedHandler.handle(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (ScopedHandler.java:144)
  at org.apache.hudi.org.eclipse.jetty.server.handler.HandlerList.handle(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (HandlerList.java:61)
  at org.apache.hudi.org.eclipse.jetty.server.handler.StatisticsHandler.handle(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (StatisticsHandler.java:174)
  at org.apache.hudi.org.eclipse.jetty.server.handler.HandlerWrapper.handle(Ljava/lang/String;Lorg/apache/hudi/org/eclipse/jetty/server/Request;Ljavax/servlet/http/HttpServletRequest;Ljavax/servlet/http/HttpServletResponse;)V (HandlerWrapper.java:132)
  at org.apache.hudi.org.eclipse.jetty.server.Server.handle(Lorg/apache/hudi/org/eclipse/jetty/server/HttpChannel;)V (Server.java:502)
  at org.apache.hudi.org.eclipse.jetty.server.HttpChannel.handle()Z (HttpChannel.java:370)
  at org.apache.hudi.org.eclipse.jetty.server.HttpConnection.onFillable()V (HttpConnection.java:267)
  at org.apache.hudi.org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded()V (AbstractConnection.java:305)
  at org.apache.hudi.org.eclipse.jetty.io.FillInterest.fillable()Z (FillInterest.java:103)
  at org.apache.hudi.org.eclipse.jetty.io.ChannelEndPoint$2.run()V (ChannelEndPoint.java:117)
  at org.apache.hudi.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(Ljava/lang/Runnable;)V (EatWhatYouKill.java:333)
  at org.apache.hudi.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(Z)Z (EatWhatYouKill.java:310)
  at org.apache.hudi.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(Z)V (EatWhatYouKill.java:168)
  at org.apache.hudi.org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run()V (EatWhatYouKill.java:126)
  at org.apache.hudi.org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run()V (ReservedThreadExecutor.java:366)
  at org.apache.hudi.org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(Ljava/lang/Runnable;)V (QueuedThreadPool.java:765)
  at org.apache.hudi.org.eclipse.jetty.util.thread.QueuedThreadPool$2.run()V (QueuedThreadPool.java:683)
  at java.lang.Thread.run()V (Thread.java:750)```
gudladona commented 1 year ago

We may have some indicators on what is causing this problem

we have a small file limit of 100MB, it appears that this works well (makes larger files and cleans smaller files) for an average partitions that meets the size requirements.

however, for a partition thats very busy/high volume. it seems like its over bucketing the inserts into many files bec based on avg rec size and the size of new inserts it would always exceed the file size limits and causing it to write to a new file group

example, here is number of file groups written for a single instant(commit) in this partition

aws s3 ls s3://<prefix>/<table>/<tenant>/date=20230316/ | awk -F _ '{print $3}' | sort | uniq -c | sort -nk1  | tail
 167 20230316203454183.parquet
 168 20230316195218670.parquet
 168 20230316201208079.parquet
 170 20230316200728433.parquet
 175 20230316210557345.parquet
 180 20230316130454342.parquet
 182 20230316212237421.parquet
 211 20230316192405566.parquet
 245 20230316210251305.parquet
 263 20230316204926437.parquet

As we can see here the shear number of small files in this partition is causing a HUGE json response from the driver there by triggering OOM errors.

we need help in figuring out how to tune this.

nfarah86 commented 1 year ago

what are the memory configs you're using?

gudladona commented 1 year ago

what are the memory configs you're using?

This failed on driver even with MAX heap of 32G. The threshold for failure can be relative to the number of files the partition

nsivabalan commented 1 year ago

We attempted a fix https://github.com/apache/hudi/pull/8480 let us know if this helps solve the issue

TarunMootala commented 1 week ago

I'm facing the exact OOME error with the Cleaner. Is there a fix or workaround ?

Hudi Version: 0.12.1 Spark 3.3.0 Glue: 4.0

Spark Streaming batch interval: 120 seconds

Write Configs:

        write_options = {
            "hoodie.table.name": args["table_name"],
            "hoodie.datasource.write.keygenerator.type": "COMPLEX",
            "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
            "hoodie.datasource.write.partitionpath.field": "entity_name",
            "hoodie.datasource.write.recordkey.field": "partition_key,sequence_number",
            "hoodie.datasource.write.precombine.field": "approximate_arrival_timestamp",
            "hoodie.datasource.write.operation": "insert",
            "hoodie.insert.shuffle.parallelism": 10,
            "hoodie.bulkinsert.shuffle.parallelism": 10,
            "hoodie.upsert.shuffle.parallelism": 10,
            "hoodie.delete.shuffle.parallelism": 10,
            "hoodie.metadata.enable": "false",
            "hoodie.datasource.hive_sync.use_jdbc": "false",
            "hoodie.datasource.hive_sync.enable": "false",
            "hoodie.keep.min.commits": 450,  
            "hoodie.keep.max.commits": 465, 
            "hoodie.cleaner.commits.retained": 449,
        }

Error:

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at java.lang.StringCoding.encode(StringCoding.java:350) ~[?:1.8.0_422]
    at java.lang.String.getBytes(String.java:941) ~[?:1.8.0_422]
    at io.javalin.Context.result(Context.kt:364) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.timeline.service.RequestHandler.writeValueAsStringSync(RequestHandler.java:209) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.timeline.service.RequestHandler.writeValueAsString(RequestHandler.java:175) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.timeline.service.RequestHandler.lambda$registerFileSlicesAPI$18(RequestHandler.java:383) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.timeline.service.RequestHandler$$Lambda$3772/474269844.handle(Unknown Source) ~[?:?]
    at org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:500) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.security.SecurityUtil.noopAccessManager(SecurityUtil.kt:22) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.Javalin$$Lambda$3752/1708997269.manage(Unknown Source) ~[?:?]
    at io.javalin.Javalin.lambda$addHandler$0(Javalin.java:606) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.Javalin$$Lambda$3756/1572187887.handle(Unknown Source) ~[?:?]
    at io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:46) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:17) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.core.JavalinServlet$service$1.invoke(JavalinServlet.kt:143) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.core.JavalinServlet$service$2.invoke(JavalinServlet.kt:41) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.core.JavalinServlet.service(JavalinServlet.kt:107) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at io.javalin.core.util.JettyServerUtil$initialize$httpHandler$1.doHandle(JettyServerUtil.kt:72) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1668) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.HandlerList.handle(HandlerList.java:61) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.Server.handle(Server.java:502) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.HttpChannel.handle(HttpChannel.java:370) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
    at org.apache.hudi.org.apache.jetty.io.FillInterest.fillable(FillInterest.java:103) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]

Note:

This is working if I manually archive few of the oldest Hudi Commit metadata (*.commit) files and reduce the number of commits retained. However, this is a tactical solution and the same issue is repeating after few days.

TarunMootala commented 1 week ago

@gudladona

What did you do to handle this issue?