delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.58k stars 1.7k forks source link

[BUG] VACUUM command on Delta OSS via python -> Exception #1543

Open AydinChavez opened 1 year ago

AydinChavez commented 1 year ago

Bug

Describe the problem

So I was playing around a bit to introduce myself to Delta OSS and python. When running the VACUUM command, I get an exception related to threading.

Steps to reproduce

test.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Example"
    master = "local[*]"

    # Create Spark session with Delta extension
    #deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    #spark = configure_spark_with_delta_pip(builder).getOrCreate()
    spark = builder.getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    # Create a DataFrame
    df = spark.range(1, 10)
    df = df.withColumn('value', lit('ABC'))
    df.show()

    # Save as delta table
    df.write.format('delta').mode("overwrite").save('/tmp/delta/test_table')

    df_sql = spark.sql("SELECT * from delta.`/tmp/delta/test_table` order by id")
    df_sql.show()

    df_sql = spark.sql("OPTIMIZE delta.`/tmp/delta/test_table`")
    df_sql.show()

    # this one fails!
    df_sql = spark.sql("VACUUM delta.`/tmp/delta/test_table`")
    df_sql.show()

spark-submit --packages "io.delta:delta-core_2.12:2.2.0" test.py

Observed results

 ~/workspace/delta  spark-submit --packages "io.delta:delta-core_2.12:2.2.0" test.py                                                                                                          ✔ │ 16s │ 21:44:57
:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.3.1/libexec/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/aydink/.ivy2/cache
The jars for the packages stored in: /Users/aydink/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0254e73b-e0f2-4275-8e9d-d6718adedd46;1.0
    confs: [default]
    found io.delta#delta-core_2.12;2.2.0 in central
    found io.delta#delta-storage;2.2.0 in central
    found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 89ms :: artifacts dl 3ms
    :: modules in use:
    io.delta#delta-core_2.12;2.2.0 from central in [default]
    io.delta#delta-storage;2.2.0 from central in [default]
    org.antlr#antlr4-runtime;4.8 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0254e73b-e0f2-4275-8e9d-d6718adedd46
    confs: [default]
    0 artifacts copied, 3 already retrieved (0kB/2ms)
23/01/05 21:47:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/05 21:47:49 INFO SparkContext: Running Spark version 3.3.1
23/01/05 21:47:49 INFO ResourceUtils: ==============================================================
23/01/05 21:47:49 INFO ResourceUtils: No custom resources configured for spark.driver.
23/01/05 21:47:49 INFO ResourceUtils: ==============================================================
23/01/05 21:47:49 INFO SparkContext: Submitted application: PySpark Delta Lake Example
23/01/05 21:47:49 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/01/05 21:47:49 INFO ResourceProfile: Limiting resource is cpu
23/01/05 21:47:49 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/01/05 21:47:49 INFO SecurityManager: Changing view acls to: aydink
23/01/05 21:47:49 INFO SecurityManager: Changing modify acls to: aydink
23/01/05 21:47:49 INFO SecurityManager: Changing view acls groups to:
23/01/05 21:47:49 INFO SecurityManager: Changing modify acls groups to:
23/01/05 21:47:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(aydink); groups with view permissions: Set(); users  with modify permissions: Set(aydink); groups with modify permissions: Set()
23/01/05 21:47:49 INFO Utils: Successfully started service 'sparkDriver' on port 52664.
23/01/05 21:47:49 INFO SparkEnv: Registering MapOutputTracker
23/01/05 21:47:49 INFO SparkEnv: Registering BlockManagerMaster
23/01/05 21:47:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/01/05 21:47:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/01/05 21:47:49 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/01/05 21:47:49 INFO DiskBlockManager: Created local directory at /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/blockmgr-d1d75f57-9346-4337-8571-32829b56fdcd
23/01/05 21:47:49 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
23/01/05 21:47:49 INFO SparkEnv: Registering OutputCommitCoordinator
23/01/05 21:47:49 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/01/05 21:47:49 INFO SparkContext: Added JAR file:///Users/aydink/.ivy2/jars/io.delta_delta-core_2.12-2.2.0.jar at spark://192.168.2.114:52664/jars/io.delta_delta-core_2.12-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO SparkContext: Added JAR file:///Users/aydink/.ivy2/jars/io.delta_delta-storage-2.2.0.jar at spark://192.168.2.114:52664/jars/io.delta_delta-storage-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO SparkContext: Added JAR file:///Users/aydink/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar at spark://192.168.2.114:52664/jars/org.antlr_antlr4-runtime-4.8.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO SparkContext: Added file file:///Users/aydink/.ivy2/jars/io.delta_delta-core_2.12-2.2.0.jar at file:///Users/aydink/.ivy2/jars/io.delta_delta-core_2.12-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: Copying /Users/aydink/.ivy2/jars/io.delta_delta-core_2.12-2.2.0.jar to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-core_2.12-2.2.0.jar
23/01/05 21:47:49 INFO SparkContext: Added file file:///Users/aydink/.ivy2/jars/io.delta_delta-storage-2.2.0.jar at file:///Users/aydink/.ivy2/jars/io.delta_delta-storage-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: Copying /Users/aydink/.ivy2/jars/io.delta_delta-storage-2.2.0.jar to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-storage-2.2.0.jar
23/01/05 21:47:49 INFO SparkContext: Added file file:///Users/aydink/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar at file:///Users/aydink/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: Copying /Users/aydink/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/org.antlr_antlr4-runtime-4.8.jar
23/01/05 21:47:49 INFO Executor: Starting executor ID driver on host 192.168.2.114
23/01/05 21:47:49 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
23/01/05 21:47:49 INFO Executor: Fetching file:///Users/aydink/.ivy2/jars/io.delta_delta-storage-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: /Users/aydink/.ivy2/jars/io.delta_delta-storage-2.2.0.jar has been previously copied to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-storage-2.2.0.jar
23/01/05 21:47:49 INFO Executor: Fetching file:///Users/aydink/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: /Users/aydink/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar has been previously copied to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/org.antlr_antlr4-runtime-4.8.jar
23/01/05 21:47:49 INFO Executor: Fetching file:///Users/aydink/.ivy2/jars/io.delta_delta-core_2.12-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: /Users/aydink/.ivy2/jars/io.delta_delta-core_2.12-2.2.0.jar has been previously copied to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-core_2.12-2.2.0.jar
23/01/05 21:47:49 INFO Executor: Fetching spark://192.168.2.114:52664/jars/org.antlr_antlr4-runtime-4.8.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO TransportClientFactory: Successfully created connection to /192.168.2.114:52664 after 8 ms (0 ms spent in bootstraps)
23/01/05 21:47:49 INFO Utils: Fetching spark://192.168.2.114:52664/jars/org.antlr_antlr4-runtime-4.8.jar to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/fetchFileTemp15305034300907488542.tmp
23/01/05 21:47:49 INFO Utils: /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/fetchFileTemp15305034300907488542.tmp has been previously copied to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/org.antlr_antlr4-runtime-4.8.jar
23/01/05 21:47:49 INFO Executor: Adding file:/private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/org.antlr_antlr4-runtime-4.8.jar to class loader
23/01/05 21:47:49 INFO Executor: Fetching spark://192.168.2.114:52664/jars/io.delta_delta-core_2.12-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: Fetching spark://192.168.2.114:52664/jars/io.delta_delta-core_2.12-2.2.0.jar to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/fetchFileTemp5578658744473457225.tmp
23/01/05 21:47:49 INFO Utils: /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/fetchFileTemp5578658744473457225.tmp has been previously copied to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-core_2.12-2.2.0.jar
23/01/05 21:47:49 INFO Executor: Adding file:/private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-core_2.12-2.2.0.jar to class loader
23/01/05 21:47:49 INFO Executor: Fetching spark://192.168.2.114:52664/jars/io.delta_delta-storage-2.2.0.jar with timestamp 1672951669142
23/01/05 21:47:49 INFO Utils: Fetching spark://192.168.2.114:52664/jars/io.delta_delta-storage-2.2.0.jar to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/fetchFileTemp6019150441707736184.tmp
23/01/05 21:47:49 INFO Utils: /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/fetchFileTemp6019150441707736184.tmp has been previously copied to /private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-storage-2.2.0.jar
23/01/05 21:47:49 INFO Executor: Adding file:/private/var/folders/dt/70pgzqsj1190jtrxf_mt039c0000gn/T/spark-3942792b-1421-4270-b17b-a6edd373a3ea/userFiles-b86f9746-52f3-409c-b576-cee24155c7ca/io.delta_delta-storage-2.2.0.jar to class loader
23/01/05 21:47:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52666.
23/01/05 21:47:49 INFO NettyBlockTransferService: Server created on 192.168.2.114:52666
23/01/05 21:47:49 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/01/05 21:47:49 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.2.114, 52666, None)
23/01/05 21:47:49 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.2.114:52666 with 434.4 MiB RAM, BlockManagerId(driver, 192.168.2.114, 52666, None)
23/01/05 21:47:49 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.2.114, 52666, None)
23/01/05 21:47:49 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.2.114, 52666, None)
+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  ABC|
|  3|  ABC|
|  4|  ABC|
|  5|  ABC|
|  6|  ABC|
|  7|  ABC|
|  8|  ABC|
|  9|  ABC|
+---+-----+

23/01/05 21:47:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/01/05 21:47:54 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  ABC|
|  3|  ABC|
|  4|  ABC|
|  5|  ABC|
|  6|  ABC|
|  7|  ABC|
|  8|  ABC|
|  9|  ABC|
+---+-----+

+--------------------+--------------------+
|                path|             metrics|
+--------------------+--------------------+
|file:/tmp/delta/t...|{1, 8, {784, 784,...|
+--------------------+--------------------+

Deleted 0 files and directories in a total of 1 directories.
+--------------------+
|                path|
+--------------------+
|file:/tmp/delta/t...|
+--------------------+

23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@29643ca0 rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Shutting down, pool size = 7, active threads = 7, queued tasks = 0, completed tasks = 11033]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@25514899 rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Shutting down, pool size = 6, active threads = 6, queued tasks = 0, completed tasks = 11034]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@2b85a821 rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 11035]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@721a50a0 rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Shutting down, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 11036]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@27be5327 rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 11037]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@47ae74de rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 11038]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$3@30f9db09 rejected from java.util.concurrent.ThreadPoolExecutor@5fc27a9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 11038]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:61)
    at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:819)
    at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:794)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:71)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@297c12c5 rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 11040]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$3@2a503c9c rejected from java.util.concurrent.ThreadPoolExecutor@5fc27a9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 11038]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:61)
    at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:819)
    at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:794)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:71)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)
23/01/05 21:48:02 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@6454a9cb rejected from java.util.concurrent.ThreadPoolExecutor@13a2e749[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 11040]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
    at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
    at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:74)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1589)

Expected results

Vacuum command should not give an exception.

Environment information

OS: MacOs Ventura

 ~/workspace/delta  spark-submit --version                                                                                                                                                    ✔ │ 18s │ 21:57:00
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 19.0.1
Branch HEAD
Compiled by user yumwang on 2022-10-15T09:47:01Z
Revision fbbcf9434ac070dd4ced4fb9efe32899c6db12a9
Url https://github.com/apache/spark
Type --help for more information.

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

zsxwing commented 1 year ago

Thanks for reporting this. VACUUM was actually sucessful based on the following output

Deleted 0 files and directories in a total of 1 directories.
+--------------------+
|                path|
+--------------------+
|file:/tmp/delta/t...|
+--------------------+

This looks like an issue when the program was stopping. It was trying to run Spark tasks when Spark was being stopped. Could you remove spark.sparkContext.setLogLevel("WARN") so that we can get more information to see why there were still tasks being submitted after VACUUM?

AydinChavez commented 1 year ago

You're welcome. Sure, I attached the log output and removed the mentioned logLevel-statement before.

output.log

zsxwing commented 1 year ago

Looks like it's a bug in Spark AQE. The issue is gone if turning off spark.sql.adaptive.enabled. My hunch is AQE launches a Spark job asynchronously but doesn't cancel it properly when a query doesn't need all partitions. It would be great if you can create an issue for Spark community.