apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] Strange Behaviour of Deltastreamer Utility on EMR with YARN Scheduler #9341

Open danielfordfc opened 1 year ago

danielfordfc commented 1 year ago

Describe the problem you faced

Apache Hudi's deltastreamer utility on EMR, with YARN as the scheduler. I've noticed a rather peculiar behaviour that has started causing sporadic errors in my jobs.

When an EMR job is cancelled either manually by the user or programmatically (in my case, through Airflow), it seems that occasionally YARN doesn't receive the termination message and continues running the job in the background. This behavior is quite misleading, as the EMR Console indicates the job has been stopped, while in reality, it is still running on the cluster.

The issue is further complicated when a cancelled job is rerun. It appears that in these scenarios, two instances of the deltastreamer job run concurrently, without awareness of each other. Considering the nature of deltastreamer, which is designed to run as a single process for a given Hudi table, this parallel execution leads to several inconsistencies and problems.

This is a known problem with AWS, and we have opened up a support ticket with them previously... Their main reply can be found in the additional context section below

Symptoms

The manifestation of this issue is quite unpredictable, but some symptoms include:

These symptoms have led to sporadic failures of the jobs and have made our process quite unstable.

For instance, please see a Single deltastreamer ingestion commit timeline, vs., when we have this issue. Jobs spuriously fail out between minutes and hours from starting up, when running the deltastreamer in --continuous mode

Screenshot 2023-07-25 at 13 53 23 Screenshot 2023-07-25 at 13 58 00

Reproduction Steps

Expected behavior

The deltastreamer is somehow aware of the duplicate runs going on? I'm not even 100% certain that that IS whats happening..

Environment Description

emr-6.8.0 S3 Storage Syncing Hive to Glue Catalog through Hudi Deltastreamer.

Additional Context

original slack thread about the issue: https://apache-hudi.slack.com/archives/C4D716NPQ/p1690212730633249

AWS Support ticket response after a couple calls with them and back and forth communications:

"As discussed, I understood that you have cancelled running steps in EMR Cluster from EMR console and the steps are still running in the background.

I checked further at my end and I can see that multiple customers have reported this issue that upon cancelling any Running EMR step, the step shows CANCELLED on the console even though the step/related application is running at background. AWS EMR service team has become aware of this issue and confirmed this is a bug. They started working on it actively to fix it soon. But unfortunately, there is no ETA available on this yet.

Hence currently as given here:[1] the best way to stop the running step/application would be to kill it using the application ID (for YARN steps) which you're already aware of."

Sample Deltastreamer spark-submit command (formatted for readability)

"name": [
    {
        "Name": "name",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--master", "yarn",
                "--deploy-mode", "cluster",
                "--executor-memory", "1g",
                "--driver-memory", "2g",
                "--num-executors", "2",
                "--executor-cores","2",
                "--class", "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer",
                "--conf", "spark.dynamicAllocation.enabled=false",
                "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
                "--conf", "spark.sql.catalogImplementation=hive",
                "--conf", "spark.sql.hive.convertMetastoreParquet=false",
                "--conf", "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
                "--conf", "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
                "--conf", "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                "--conf", "spark.streaming.kafka.allowNonConsecutiveOffsets=true",
                "--conf", "spark.hadoop.parquet.avro.write-old-list-structure=false",
                # IMPORTANT: hudi-utilities-bundle must be declared immediately before any Hudi spark commands
                "/usr/lib/hudi/hudi-utilities-bundle.jar",
                "--source-class", "org.apache.hudi.utilities.sources.AvroKafkaSource",
                "--table-type", "COPY_ON_WRITE",
                "--op", "INSERT",
                "--enable-sync",
                "--continuous",
                # Hudi write config
                "--target-base-path", f"s3://{bucket}/raw/table_name",
                "--target-table", "table_name",
                "--hoodie-conf", "hoodie.merge.allow.duplicate.on.inserts=true",
                "--hoodie-conf", "hoodie.database.name=table_name",
                "--hoodie-conf", "hoodie.table.name=table_name",
                "--hoodie-conf", "hoodie.datasource.write.recordkey.field=uuid",
                "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator",
                "--hoodie-conf", "hoodie.datasource.write.partitionpath.field=updated_at",
                "--hoodie-conf", "hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS",
                "--hoodie-conf", "hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd",
                "--hoodie-conf", "hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true",
                "--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer",
                "--hoodie-conf", "hoodie.write.markers.type=DIRECT",
                # AWS Glue Data Catalog config
                "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
                "--hoodie-conf", "hoodie.datasource.hive_sync.database=glue_db_name",
                "--hoodie-conf", "hoodie.datasource.hive_sync.table=table_name",
                "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=_event_date",
                # Hudi Metrics
                "--hoodie-conf", "hoodie.metrics.on=true",
                "--hoodie-conf", "hoodie.metrics.reporter.type=CLOUDWATCH",
                "...... and a bunch of kafka and SR connectivity args...",
            ]
        }
    }

Stacktraces

Some of the stack traces we've seen highly likely due to this issue.

  1. It seems to want to clean up an already deleted file.

    Caused by: org.apache.hudi.exception.HoodieException: java.io.FileNotFoundException: No such file or directory 's3://path-1/raw/table_name/2023/07/21/59c7616d-e78a-4d65-830d-6080ccba3098-0_0-88-78_20230721082240373.parquet'
    at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
    ... 29 more
  2. Failure to rollback error

    Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Failed to rollback s3://path1/raw/table_name commits 20230721073522498
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:190)
    ... 8 more
  3. Concurrent Glue Sync Error

23/07/07 21:03:23 ERROR Client: Application diagnostics message: User class threw exception: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:192) at org.apache.hudi.common.util.Option.ifPresent(Option.java:97) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:187) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:555) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742) Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:190) ... 8 more Caused by: org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:715) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:61) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncMeta(DeltaSync.java:715) at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:634) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:333) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:681) ... 4 more Caused by: org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing listing_created_v2 at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:149) at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:59) ... 8 more Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to get update last commit time synced to Option{val=20230707210236484} at org.apache.hudi.hive.HoodieHiveClient.updateLastCommitTimeSynced(HoodieHiveClient.java:300) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:241) at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:158) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:146) ... 9 more Caused by: MetaException(message:Update table failed due to concurrent modifications. (Service: AWSGlue; Status Code: 400; Error Code: ConcurrentModificationException; Request ID: 01e53510-78d3-4d76-b091-eeb39c99625e; Proxy: null)) at com.amazonaws.glue.catalog.converters.BaseCatalogToHiveConverter.getHiveException(BaseCatalogToHiveConverter.java:112) at com.amazonaws.glue.catalog.converters.BaseCatalogToHiveConverter.wrapInHiveException(BaseCatalogToHiveConverter.java:100) at com.amazonaws.glue.catalog.metastore.GlueMetastoreClientDelegate.alterTable(GlueMetastoreClientDelegate.java:569) at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.alter_table(AWSCatalogMetastoreClient.java:407) at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.alter_table(AWSCatalogMetastoreClient.java:392) at sun.reflect.GeneratedMethodAccessor312.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350) at com.sun.proxy.$Proxy83.alter_table(Unknown Source) at org.apache.hudi.hive.HoodieHiveClient.updateLastCommitTimeSynced(HoodieHiveClient.java:298) ... 12 more

danielfordfc commented 1 year ago

cc: @rahil-c

danielfordfc commented 1 year ago

Would this not have been a problem if we'd have configured a lock provider? https://hudi.apache.org/docs/concurrency_control

Regardless, YARN seemingly continuing to run the deltastreamer after the EMR Step has been cancelled is definitely problematic.