delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.63k stars 1.71k forks source link

Streaming to Delta Sink, Sharp Increase in Batch Time after ~36h Using Delta-1.0.0 #859

Open mlecuyer-nd opened 2 years ago

mlecuyer-nd commented 2 years ago

Background

Application is using Structured Streaming to do the following every batch:

Input Rate: ~2-6k records/second depending on time of day over 25 Kafka Partitions

When Moving over to Delta-1.0.0, a "degraded state" was noticed after about 48 hours of run-time, (and since shortened to shortened to ~36 hours.). In this degraded state, batch times significantly increase, but the job continues to run. This has an almost harmonic effect, as when the batch time increases, more data has to be read, which leads to longer batch times, which leads to more backlog of data, ad infinitum.

Expected Behavior

Observed Behavior

org.apache.spark.sql.delta.commands.VacuumCommand$.$anonfun$gc$1(VacuumCommand.scala:239)
com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
org.apache.spark.sql.delta.commands.VacuumCommand$.recordOperation(VacuumCommand.scala:49)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:106)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
org.apache.spark.sql.delta.commands.VacuumCommand$.recordDeltaOperation(VacuumCommand.scala:49)
org.apache.spark.sql.delta.commands.VacuumCommand$.gc(VacuumCommand.scala:101)
io.delta.tables.execution.VacuumTableCommand.run(VacuumTableCommand.scala:69)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)

Current Debugging Done

Debugging yet-to-be-done

Environment and Configs

Node Count/Size

Storage

Functioning Environment

Non-Functioning Environment

Spark Configs

Application Configs

    .set('spark.scheduler.mode', 'FAIR') \
    .set("spark.executor.cores", CORE_VALUE) \
    .set("spark.executor.memory", MEMORY_VALUE)\
    .set('spark.dynamicAllocation.enabled', 'true')\
    .set('spark.sql.files.maxPartitionBytes', '1073741824') \
    .set('spark.driver.maxResultSize', 0) \
    .set('spark.dynamicAllocation.minExecutors','3')\
    .set('spark.executor.heartbeatInterval', '25000') \
    .set('spark.databricks.delta.vacuum.parallelDelete.enabled', 'true') \
    .set('spark.databricks.delta.retentionDurationCheck.enabled', 'false') \
    .set('spark.databricks.delta.checkpoint.partSize', '1000000') \
    .set('spark.databricks.delta.snapshotPartitions', '150')
#### Output Configs
            .option('maxRecordsPerFile', 3000000) \
            .option('mergeSchema', 'true') \
            .option('checkpointLocation', output_location + table_name + f'/_checkpoints/{config["source_name"]}') \
            .partitionBy('HOURLY_TIMESTAMP_FIELD') \
            .start(output_location + table_name) \
            .awaitTermination()
### **Delta Table Configs**
#### Non-Functioning Environment
        .property('delta.deletedFileRetentionDuration', '6 HOURS') \
        .property('delta.logRetentionDuration', '96 HOURS')

#### Functioning Environment
* Default Settings

cc: @dennyglee following slack conversation
dennyglee commented 2 years ago

Thanks @mlecuyer-nd for the meticulous repro information. As you already noted in the debugging yet-to-be done section, let's see if we can isolate whether this is related to EMR 6.3, Spark 3.1, and/or Delta Lake 1.0 that's causing what appears to be vacuum-related issues.

inh3 commented 2 years ago

Background

I am encountering the same issue as @mlecuyer-nd where significant time is being spent in a job with the description $anonfun$gc$1 at DatabricksLogging.scala:77 during vacuum.

I created a very simple example to illustrate the problem. I created a simple table with 2 rows of data then immediately vacuumed. During vacuuming there was a Spark Job that was scheduled with 10K tasks and took 7 seconds. I also noticed some other jobs that seemed to have excessive tasks given the simplicity of this example.

My example is using Delta 1.1.0.

Description of the example is below.

Environment

I am using Spark in Local Mode via PySpark on an EC2 instance running Amazon Linux 2. I am using Delta with a local file-system based table. I am executing my code via a Jupyter notebook.

Spark Context Configuration

import os, sys
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages=io.delta:delta-core_2.12:1.1.0 pyspark-shell'

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder \
    .master("local[*]") \
    .appName("VacuumBug") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config('spark.databricks.delta.retentionDurationCheck.enabled', 'false') \
    .config('spark.databricks.delta.vacuum.parallelDelete.enabled', 'true')

spark = configure_spark_with_delta_pip(builder).getOrCreate()
sc = spark.sparkContext

print('python version:', sys.version)
print('pyspark version:', sc.version)
print('java spark version:', sc._jsc.version())
print(f'Hadoop version = {sc._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}')
python version: 3.7.10 (default, Jun  3 2021, 00:02:01) 
[GCC 7.3.1 20180712 (Red Hat 7.3.1-13)]
pyspark version: 3.2.0
java spark version: 3.2.0
Hadoop version = 3.3.1

Example

This example creates a table with only two rows and three columns, one of which is the partition column.

Raw Source Data

// test_data_a.json
{
    "name": "John",
    "age": 14
}

// test_data_b.json
{
    "name": "Mary",
    "age": 77 
}

Creating the Delta Lake Table

import pyspark.sql.functions as F

TABLE_PATH = "/tmp/vacuum-bug/table"

jsonPath = "/tmp/vacuum-bug/test_data_*.json"
jsonDF = spark.read.json(jsonPath, multiLine=True)
jsonDF = jsonDF.withColumn("partitionColumn", F.lit(1))
jsonDF.printSchema()
jsonDF.show(vertical=False)

jsonDF = jsonDF.repartition(1, "partitionColumn")
print('num partitions:', jsonDF.rdd.getNumPartitions())

jsonDF \
    .write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .partitionBy("partitionColumn") \
    .save(TABLE_PATH)
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- partitionColumn: integer (nullable = false)

+---+----+---------------+
|age|name|partitionColumn|
+---+----+---------------+
| 77|Mary|              1|
| 14|John|              1|
+---+----+---------------+

num partitions: 1

Filesystem After Table Creation

/tmp/vacuum-bug % ls -l
drwxr-xr-x 4 user group 4096 Dec  8 20:52 table
-rw-r--r-- 1 user group   38 Dec  8 20:23 test_data_a.json
-rw-r--r-- 1 user group   39 Dec  8 20:23 test_data_b.json

/tmp/vacuum-bug % ls -al ./table/partitionColumn=1
drwxr-xr-x 2 user group 4096 Dec  8 20:52 .
drwxr-xr-x 4 user group 4096 Dec  8 20:52 ..
-rw-r--r-- 1 user group  719 Dec  8 20:52 part-00000-ce61e66d-f0ef-4133-a07e-b89c7d10569c.c000.snappy.parquet
-rw-r--r-- 1 user group   16 Dec  8 20:52 .part-00000-ce61e66d-f0ef-4133-a07e-b89c7d10569c.c000.snappy.parquet.crc

/tmp/vacuum-bug % ls -al ./table/_delta_log
drwxr-xr-x 2 user group 4096 Dec  8 20:52 .
drwxr-xr-x 4 user group 4096 Dec  8 20:52 ..
-rw-r--r-- 1 user group 1067 Dec  8 20:52 00000000000000000000.json
-rw-r--r-- 1 user group   20 Dec  8 20:52 .00000000000000000000.json.crc

Reading Table

deltaTable = DeltaTable.forPath(spark, TABLE_PATH)
deltaTableDF = deltaTable.toDF()

print('num partitions:', deltaTableDF.rdd.getNumPartitions())
deltaTableDF.show(vertical=False)
num partitions: 1
+---+----+---------------+
|age|name|partitionColumn|
+---+----+---------------+
| 77|Mary|              1|
| 14|John|              1|
+---+----+---------------+

Vacuum Table

Lastly I do a vacuum. This ideally would be a no-op because there are no changes/versions/etc on this table.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, TABLE_PATH)
deltaTable.vacuum(0)
/tmp/vacuum-bug % ls -al table/partitionColumn=1
drwxr-xr-x 2 user group 4096 Dec  8 20:58 .
drwxr-xr-x 4 user group 4096 Dec  8 20:58 ..
-rw-r--r-- 1 user group  719 Dec  8 20:58 part-00000-7c2ad9fb-8191-42a9-9d55-19f8c5acc040.c000.snappy.parquet
-rw-r--r-- 1 user group   16 Dec  8 20:58 .part-00000-7c2ad9fb-8191-42a9-9d55-19f8c5acc040.c000.snappy.parquet.crc

/tmp/vacuum-bug % ls -al table/_delta_log
drwxr-xr-x 2 user group 4096 Dec  8 20:58 .
drwxr-xr-x 4 user group 4096 Dec  8 20:58 ..
-rw-r--r-- 1 user group 1067 Dec  8 20:58 00000000000000000000.json
-rw-r--r-- 1 user group   20 Dec  8 20:58 .00000000000000000000.json.crc

Spark Jobs

Below is a screenshot from the Spark Web UI showing the jobs that were completed for each of the steps in this example.

spark_jobs

Problem/Issue

If you look at the jobs, it appears that for both reading and vacuuming, there are numerous tasks being spawned that are unnecessary.

I would also like to point out that there is a previous issue from 2020 that was closed without any investigation/resolution that showed similar behavior.

387

dennyglee commented 2 years ago

Thanks very much for providing the additional details @inh3 - we will continue the investigation as well!

dennyglee commented 2 years ago

Note, I did a quick test following @inh3 excellent instructions and we may have separate issues - one that @inh3 calls out as noted in #387 and the issue that you're still facing @mlecuyer-nd.

Test Reproduction

Follow the exact same steps as @inh3 noted using Delta Lake 0.8.0 (Spark 3.0.0) and Delta Lake 1.1 (Spark 3.2) which matches @mlecuyer-nd configuration. Note, I'm running this on my local box vs. EC2 instance but this is moot for this discussion.

Using Delta 1.1

The results are similar to yours @inh3 per the following view with 21 jobs generated following the same steps and a 10s job with 10,216 tasks.

Screen Shot 2021-12-08 at 10 51 30 PM

Using Delta 0.8.0

What is interesting here is that because of the recursive gc as noted in #387, there are 210 jobs (10x) spawned.

Screen Shot 2021-12-08 at 10 51 01 PM

But when diving deeper, it's still the same 10s and 10,000 tasks that is happening.

Screen Shot 2021-12-08 at 10 51 11 PM

As was noted in #387, vacuum can be expensive because it does the following

  1. List all the files in the directory (recursively if partitioned)
  2. Find out all files needed by all the retained table versions.
  3. Join these to find which files are not needed and then delete them.

While I can understand the consternation of this vacuum taking longer than desired, it appears its the same issue in terms of many tasks / duration for vacuum ala #387. Perhaps we can get a PR or create a separate issue for more efficient vacuum :)

Saying this, @mlecuyer-nd issue may be different in that the same code results in an extended delay for 1.0 vs. 0.8. Note, this could be an artifact of the expense of vacuum but it should be the same (or better) in 1.0 vs. 0.8. We still have more digging to do, eh?!

yangyingjie commented 2 years ago

same Issues!!!

JassAbidi commented 2 years ago

Thank you for the detailed explanation! the 10000+ tasks job is performing the recursive listing of the files/directories under the delta table. The number of tasks is controlled by spark.sql.sources.parallelPartitionDiscovery.parallelism spark conf which is by default 10000. This should explain the 10000 tasks stage. @dennyglee IMHO, we should default this to a lower value than 10000. @yangyingjie this was a default behavior even before delta 1.0.0 and this should not impact your streaming query performance. Could you please provide more input about the vacuum and compaction jobs that you run concurrently with the job appending to your delta table?

dennyglee commented 2 years ago

@JassAbidi Awesome call out - you're right, we probably should provide better guidance on using this. For example, in my preceding example of when using Delta 1.1, using:

spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "10")
deltaTable.vacuum(0)

The vacuum task completed in 0.4s with 226 tasks (ssspp: 10) vs. 8s with 10,216 tasks (ssspp: 10000).

We would most likely call out this change should be done just prior to the vacuum command and then reversing this (i.e. go back to the default of spark.conf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "10000")) so it would not impact other Spark jobs.

mlecuyer-nd commented 2 years ago

@JassAbidi That's very interesting, and thanks for the call out! While you didn't tag me, I'll provide the details of my compaction/vacuum as well, as I left that out of the original.

Here's the compaction:

spark.read.format('delta') \
                    .load(table_location) \
                    .where(part_statement) \
                    .repartition(4) \
                    .write \
                    .format('delta') \
                    .mode('overwrite') \
                    .option('replaceWhere', part_statement) \
                    .option('dataChange', 'false') \
                    .option('maxRecordsPerFile', '3000000') \
                    .save(table_location)

and the Vacuum is:

spark.sql(f"VACUUM delta.{table_location}RETAIN {24*7} hours")

EDIT: And just to add more information, we've now rolled back to Delta-0.8 for our production workload and can re-confirm the behavior we were seeing, in that it's been running for 1 week + without issue.

Working on prioritizing setting up testing in lower env for test cases above

dennyglee commented 2 years ago

thanks @mlecuyer-nd - QQ if you were to reduce the parallelism for just the vacuum job, I'm wondering if there is a different result for Delta 0.8 and Delta 1.0?

dennyglee commented 2 years ago

Including #886 as a potential solution to this

mlecuyer-nd commented 2 years ago

@dennyglee Sorry I haven't had time to run those test cases for ya. I attempted to remove the vacuuming for a test case before the holidays but the upstream source wasn't as stable (was going based off of s3 replication for the delta table for a dev environment).

Will hopefully have time #886 to test that change once it goes live

dennyglee commented 2 years ago

Thanks @mlecuyer-nd - all good and you don't need to apologize! Sure, thanks to @JassAbidi for creating #886 and hopefully that will address your issues. Thanks!

pedrosalgadowork commented 2 years ago

Same here... any news on how to solve/mitigate this? Thanks!

dennyglee commented 2 years ago

Hi @pedrosalgadowork - as we continue working on this - please check out https://github.com/delta-io/delta/issues/859#issuecomment-995454556 on a potential mitigation. Thanks!

pedrosalgadowork commented 2 years ago

Hi @pedrosalgadowork - as we continue working on this - please check out #859 (comment) on a potential mitigation. Thanks!

Thanks @dennyglee!