awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
648 stars 305 forks source link

Distinct fails on df for 38 values or more. #127

Closed otg2 closed 2 years ago

otg2 commented 2 years ago

Hi,

We've been having this weird problem running a simple distinct query using aws glue in docker. Image used is glue_libs_3.0.0_image_01 and can be found at https://hub.docker.com/layers/amazon/aws-glue-libs/glue_libs_3.0.0_image_01/images/sha256-b677a1db704027f9390684701e6d67053cd33bf57052824d63e4d4b551907c67?context=explore

So the issue is that we have a dataframe which has two columns, a string and an integer. Retrieving distinct values on that list works when there is a total of 37 unique values in the list e.g. this

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
codes = [
("SgfsdgBAKBCSITSEA",167),
("SLCtreteHES",19),
("SLN2YBONYCBOS",71),
("SLgf4747d2gdNYBOBOSNYC",651),
("SL4432AAEJNUSIT",61),
("DLAMAZ",654),
("DL423234AMPE",165),
("QT544ONXPgfTC",145),
("72f6756",51),
("QTC76OPA53CTGPTY1",4561),
("OhPAhSW7765LhgXR1",145),
("SBfgWAAESIT76JNU",17),
("QTdPTCONX",171),
("SBWA6345AEJNUSIT",171),
("VEJR6NYSCRLTO",11),
("VEG51h2BR22F1",13),
("VECHd1AN",112),
("SLJ4tUKEKTNJNU",1321),
("QTA3674LASJNUSIT",3211),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("SgfsdgBAKBCSITSEA",167),
("SLCtreteHES",19),
("SLN2YBONYCBOS",71),
("SLgf4747d2gdNYBOBOSNYC",651),
("SL4432AAEJNUSIT",61),
("DLAMAZ",654),
("DL423234AMPE",165),
("QT544ONXPgfTC",145),
("72f6756",51),
("QTC76OPA53CTGPTY1",4561),
("OhPAhSW7765LhgXR1",145),
("SBfgWAAESIT76JNU",17),
("QTdPTCONX",171),
("SBWA6345AEJNUSIT",171),
("VEJR6NYSCRLTO",11),
("VEG51h2BR22F1",13),
("VECHd1AN",112),
("SLJ4tUKEKTNJNU",1321),
("QTA3674LASJNUSIT",3211),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("SgfsdgBAKBCSITSEA",167),
("SLCtreteHES",19),
("SLN2YBONYCBOS",71),
("SLgf4747d2gdNYBOBOSNYC",651),
("SL4432AAEJNUSIT",61),
("DLAMAZ",654),
("DL423234AMPE",165),
("QT544ONXPgfTC",145),
("72f6756",51),
("QTC76OPA53CTGPTY1",4561),
("OhPAhSW7765LhgXR1",145),
("SBfgWAAESIT76JNU",17),
("QTdPTCONX",171),
("SBWA6345AEJNUSIT",171),
("VEJR6NYSCRLTO",11),
("VEG51h2BR22F1",13),
("VECHd1AN",112),
("SLJ4tUKEKTNJNU",1321),
("QTA3674LASJNUSIT",3211),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("QTC1HAN",145),
("I3633LGALEAYOGPS",1321),
("OPLXRASW1",1456),
("S32BA42KBCSEASIT",1321),
("SBJUKEKTNJNU",1654),
("VEJ432RNYLTOSCR",165),
("VEALAS42JNUSIT",781),
("SLA4322BCSITSEA",671),
("SLAKB3CSEASIT",6871),
("SLWAA2ESI1TJNU",781),
("ET223321-SCYGPS1",451),
("LGH1IG3HLOCINV",1654),
("QTG11UAN",154),
("ILG321ALEGPSAYO",7641),
("ET-123SCYGPSPERU",3121),
("VEPN31223WVANSEA",121),
("SLM333333333333333AYA",11),
("SLJ13UKEJNUKTN",14)
]

codeColumns = ["code_id", "numb"]
codeDF = spark.createDataFrame(data=codes, schema = codeColumns)
codeDF.printSchema()
distinctVals = codeDF.select("code_id").distinct()
print(distinctVals.count())
distinctVals.show()

This will print out

root |-- code_id: string (nullable = true) |-- numb: long (nullable = true)

37 +--------------------+ | code_id| +--------------------+ | ET223321-SCYGPS1| | SLWAA2ESI1TJNU| | VEG51h2BR22F1| | SLJ4tUKEKTNJNU| | QTdPTCONX| | LGH1IG3HLOCINV| | SLAKB3CSEASIT| |SLM33333333333333...| |SLgf4747d2gdNYBOB...| | SLN2YBONYCBOS| | SLCtreteHES| | S32BA42KBCSEASIT| | VEJ432RNYLTOSCR| | SLJ13UKEJNUKTN| | ILG321ALEGPSAYO| | QTC1HAN| | OPLXRASW1| | VEALAS42JNUSIT| | OhPAhSW7765LhgXR1| | QT544ONXPgfTC| +--------------------+ only showing top 20 rows

And all is fine. However, when we add the 38th unique value to that list

codes.append(("SLJ13N",14)) # Add 1 more item to the list
codeColumns = ["code_id", "numb"]
codeDF = spark.createDataFrame(data=codes, schema = codeColumns)
codeDF.printSchema()
distinctVals = codeDF.select("code_id").distinct()
print(distinctVals.count())
distinctVals.show()

We get the error that shuffling has failed and stream is corrupt. org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 89 (count at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted

Has anyone experienced this or might know what is wrong here? Why does it fail when there are 38 unique values in a column but works for 37 and less?

Fill stacktrace can be found here An error was encountered: An error occurred while calling o412.count. : org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 89 (count at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:494) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841) ... 28 more at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1871) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2676) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:147) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:183) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:404) at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3046) at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3045) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722) at org.apache.spark.sql.Dataset.count(Dataset.scala:3045) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last): File "/home/glue_user/spark/python/pyspark/sql/dataframe.py", line 665, in count return int(self._jdf.count()) File "/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in call answer, self.gateway_client, self.target_id, self.name) File "/home/glue_user/spark/python/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/home/glue_user/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o412.count. : org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 89 (count at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:494) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200) at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841) ... 28 more at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1871) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2676) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:147) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:183) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:404) at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3046) at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3045) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3722) at org.apache.spark.sql.Dataset.count(Dataset.scala:3045) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

SiddyP commented 2 years ago

I've been trying to get this to work properly (using lima-vm) for a few days and have recently given up.

Tried your example and I get the same issue:

image

A few things I've experimented with that didn't seem to do much

It seems this container only works for toy examples..

Rafno commented 2 years ago

I have been having the same issue and have not succeeded in actually overriding the spark configs, it seems to have no effect.

SiddyP commented 2 years ago

@otg2 @Rafno see the response to my issue: #128. Try disabling encryption (--conf spark.io.encryption.enabled=false). It is default true in the baked in config.

This solved our issues.

Rafno commented 2 years ago

@SiddyP this resolved my issue, I am overwriting the config now using the following call.

    docker run -itd -p 8888:8888 -p 4040:4040 \
    -v ~/.aws:/home/glue_user/.aws:ro \
    -v $REPO_ROOT/notebooks://home/glue_user/workspace/jupyter_workspace \
    -v $REPO_ROOT/conf/spark-defaults.conf://home/glue_user/spark/conf/spark-defaults.conf \
    -e DISABLE_SSL="true" \
    -e AWS_ACCESS_KEY_ID=$(aws configure get aws_access_key_id) \
    -e AWS_SECRET_ACCESS_KEY=$(aws configure get aws_secret_access_key) \
    -e AWS_REGION=$AWS_REGION \
    amazon/aws-glue-libs:glue_libs_3.0.0_image_01 \
    /home/glue_user/jupyter/jupyter_start.sh

And overwriting the file by disabling encryption.