apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] Deltastreamer continuous mode failed async compaction #7524

Open dyang108 opened 1 year ago

dyang108 commented 1 year ago

Hope I can get some help on a problem that I’ve been seeing in Deltastreamer, running on Mesos building on the docker.io/apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkadhoc_2.4.4:latest image on Docker hub. I get the job to run for a few hours successfully but then it consistently fails on delta-sync later on. I’m reading from an Avro Kafka topic Is there another image I should be using to ensure that Deltastreamer functions properly? Any hints on what I might be configuring wrong? I’m new to Hudi and open to all help!

I don't see anything above the async compaction failure stacktrace to indicate anything went wrong before, but also am not sure what to look for.

To Reproduce

Steps to reproduce the behavior:

Built a docker image

FROM docker.io/apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkadhoc_2.4.4:latest

ARG spark_home=/opt/spark
RUN mkdir ${spark_home}/work-dir

RUN apt-get update -y && \
    apt-get install -y libsvn1 libcurl4-nss-dev libevent-dev libsasl2-modules python libnss3 curl net-tools libopenblas-dev jnettop awscli zip jq procps && \
    apt-get autoremove -y
RUN touch /usr/local/bin/systemctl && chmod +x /usr/local/bin/systemctl && \
 wget http://repos.mesosphere.com/debian/pool/main/m/mesos/mesos_1.5.0-2.0.1.debian9_amd64.deb && \
 dpkg -i --ignore-depends=default-jre,libcurl3  mesos_1.5.0-2.0.1.debian9_amd64.deb && rm mesos_1.5.0-2.0.1.debian9_amd64.deb
ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so

RUN rm /var/lib/dpkg/info/mesos.list
RUN apt-get purge mesos -y

RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /opt/spark/jars/aws-java-sdk-1.7.4.jar
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar -o /opt/spark/jars/hadoop-aws-2.7.3.jar
RUN curl https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.11/0.12.1/hudi-utilities-bundle_2.11-0.12.1.jar -o ${spark_home}/work-dir/hudi-utilities-bundle_2.11-0.12.1.jar

ADD run.sh /

ENTRYPOINT /run.sh

Running with Hadoop-aws and trying to write to S3, with the following command in the run.sh

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--master mesos://zk://10.0.1.0:2181,10.0.5.0:2181,10.0.9.0:2181/mesos/test/test \
--deploy-mode client \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--conf spark.executor.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dbfs/cluster-logs/heap-dumps/ \
--conf spark.driver.extraJavaOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/dbfs/cluster-logs/heap-dumps/ \
--conf spark.master.rest.enabled=true \
--conf spark.mesos.uris=s3a://some-bucket/spark/job-service-confs/22b1f337c68605de3e90c90190197de6.conf \
--conf spark.mesos.executor.docker.image=docker.io/apachehudi/hudi-hadoop_2.8.4-hive_2.3.3-sparkadhoc_2.4.4:latest \
--conf spark.mesos.executor.home=/opt/spark \
--conf spark.cores.max=48 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=16G \
--conf spark.driver.memory=16G \
--jars /opt/spark/jars/* /opt/spark/work-dir/hudi-utilities-bundle_2.11-0.12.1.jar \
--props /mnt/mesos/sandbox/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--target-base-path "s3a://mlmodels/photo-activity-data13/hudi" \
--target-table "photo_activity_data" \
--op "UPSERT" \
--source-ordering-field "ts" \
--table-type "MERGE_ON_READ" \
--source-limit 1000 \
--continuous

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

22/12/19 21:00:54 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down delta-sync due to exception
org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:712)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
22/12/19 21:00:54 INFO deltastreamer.HoodieDeltaStreamer: Delta Sync shutdown. Error ?true
22/12/19 21:00:54 WARN deltastreamer.HoodieDeltaStreamer: Gracefully shutting down compactor
22/12/19 21:00:54 INFO deltastreamer.HoodieDeltaStreamer: DeltaSync shutdown. Closing write client. Error?true
22/12/19 21:00:54 ERROR async.HoodieAsyncService: Service shutdown with error
java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:193)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:190)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:746)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:712)
    ... 4 more
22/12/19 21:00:54 INFO lock.LockManager: Released connection created for acquiring lock
22/12/19 21:00:54 INFO transaction.TransactionManager: Transaction manager closed
22/12/19 21:00:54 INFO deltastreamer.DeltaSync: Shutting down embedded timeline server
22/12/19 21:00:54 INFO embedded.EmbeddedTimelineService: Closing Timeline server
22/12/19 21:00:54 INFO service.TimelineService: Closing Timeline Service
22/12/19 21:00:54 INFO javalin.Javalin: Stopping Javalin ...
22/12/19 21:00:54 INFO server.AbstractConnector: Stopped Spark@65bb9029{HTTP/1.1,[http/1.1]}{0.0.0.0:8090}
22/12/19 21:00:54 INFO ui.SparkUI: Stopped Spark web UI at http://ip-10-0-17-57.ec2.internal:8090/
22/12/19 21:00:54 ERROR javalin.Javalin: Javalin failed to stop gracefully
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:333)
    at org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
    at org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:248)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:450)
    at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
    at io.javalin.Javalin.stop(Javalin.java:195)
    at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:325)
    at org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:141)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:907)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:848)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:227)
    at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
22/12/19 21:00:54 INFO javalin.Javalin: Javalin has stopped
22/12/19 21:00:54 INFO service.TimelineService: Closed Timeline Service
22/12/19 21:00:54 INFO embedded.EmbeddedTimelineService: Closed Timeline server
22/12/19 21:00:54 INFO mesos.MesosCoarseGrainedSchedulerBackend: Shutting down all executors
22/12/19 21:00:54 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: Mesos task 4 is now TASK_FINISHED
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: Mesos task 2 is now TASK_FINISHED
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: Mesos task 1 is now TASK_FINISHED
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: Mesos task 3 is now TASK_FINISHED
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: Mesos task 0 is now TASK_FINISHED
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: Mesos task 5 is now TASK_FINISHED
I1219 21:00:55.627799    65 sched.cpp:2009] Asked to stop the driver
I1219 21:00:55.627866   147 sched.cpp:1191] Stopping framework 9dcd7947-26f3-4eb3-9ace-349103ab1b14-73491
22/12/19 21:00:55 INFO mesos.MesosCoarseGrainedSchedulerBackend: driver.run() returned with code DRIVER_STOPPED
22/12/19 21:00:55 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/12/19 21:00:55 INFO memory.MemoryStore: MemoryStore cleared
22/12/19 21:00:55 INFO storage.BlockManager: BlockManager stopped
22/12/19 21:00:55 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/12/19 21:00:55 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/12/19 21:00:55 INFO spark.SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:195)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:190)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:193)
    ... 15 more
Caused by: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:746)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Async compaction failed.  Shutting down Delta Sync...
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:712)
    ... 4 more
22/12/19 21:00:55 INFO util.ShutdownHookManager: Shutdown hook called
22/12/19 21:00:55 INFO util.ShutdownHookManager: Deleting directory /mnt/mesos/sandbox/spark-ffb8d4f5-9422-47a3-809a-efaba70f3a51
22/12/19 21:00:55 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-905af617-d555-4481-89bf-b68482bfb16e

Slack here: https://apache-hudi.slack.com/archives/C4D716NPQ/p1671489527580939

yihua commented 1 year ago

Hi @dyang108 Thanks for raising the issue. Could you search for any exception that happened inside the Spark driver logs? The exception logs can happen way before the async compaction failure. We need to understand what exactly causes the Deltastreamer to fail.

dyang108 commented 1 year ago

Hey Ethan - thanks for the response. I see this exception in the driver logs before the async compaction failure:

22/12/19 20:10:33 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 6.0 (TID 6, 10.0.22.122, executor 3): org.apache.hudi.exception.HoodieException: Exception when reading log file 
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
    at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
    at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198)
    at org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Block has already been inflated
    at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:67)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:267)
    at org.apache.hudi.common.table.log.block.HoodieLogBlock.inflate(HoodieLogBlock.java:278)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.readRecordsFromBlockPayload(HoodieDataBlock.java:185)
    at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecordIterator(HoodieDataBlock.java:147)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.getRecordsIterator(AbstractHoodieLogRecordReader.java:492)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:379)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464)
    at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
    ... 30 more
yihua commented 2 months ago

The issue of Block has already been inflated is due to a bug that is fixed by #7434.