airscholar / SparkingFlow

This project demonstrates how to use Apache Airflow to submit jobs to Apache spark cluster in different programming laguages using Python, Scala and Java as an example.
https://www.datamasterylab.com/home/course/apache-airflow-on-steriods-for-data-engineers/9
15 stars 15 forks source link

SparkSumbitOperator stuck #3

Closed Evgeny-Larin closed 5 months ago

Evgeny-Larin commented 5 months ago

Hello! I configured spark in a Docker compose file according to the instructions from the video and edited the Dockerfile as suggested by weldermartins in the previous issue
When I launch SparkSumbitOperator (or BashOperator with spark-submit --master spark://spark-master:7077 test_spark_job.py command) in airflow, I see the creation of a job in the spark master and the execution stuck.
image
image
image
If you open the airflow logs you can see the following text:

[2024-01-26, 06:43:18 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_dag.spark_job2 scheduled__2023-06-04T00:00:00+00:00 [queued]>  
[2024-01-26, 06:43:18 UTC] {taskinstance.py:1157} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_dag.spark_job2 scheduled__2023-06-04T00:00:00+00:00 [queued]>  
[2024-01-26, 06:43:18 UTC] {taskinstance.py:1359} INFO - Starting attempt 1 of 1  
[2024-01-26, 06:43:18 UTC] {taskinstance.py:1380} INFO - Executing <Task(BashOperator): spark_job2> on 2023-06-04 00:00:00+00:00  
[2024-01-26, 06:43:18 UTC] {standard_task_runner.py:57} INFO - Started process 939 to run task  
[2024-01-26, 06:43:18 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_dag', 'spark_job2', 'scheduled__2023-06-04T00:00:00+00:00', '--job-id', '6', '--raw', '--subdir', 'DAGS_FOLDER/test_dag/test_dag.py', '--cfg-path', '/tmp/tmpmaxt88kt']   
[2024-01-26, 06:43:18 UTC] {standard_task_runner.py:85} INFO - Job 6: Subtask spark_job2   
[2024-01-26, 06:43:18 UTC] {task_command.py:415} INFO - Running <TaskInstance: test_dag.spark_job2 scheduled__2023-06-04T00:00:00+00:00 [running]> on host cc02b800ce22  
[2024-01-26, 06:43:18 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='ELarin' AIRFLOW_CTX_DAG_ID='test_dag' AIRFLOW_CTX_TASK_ID='spark_job2' AIRFLOW_CTX_EXECUTION_DATE='2023-06-04T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-06-04T00:00:00+00:00'  
[2024-01-26, 06:43:18 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp  
[2024-01-26, 06:43:18 UTC] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'spark-submit --master spark://spark-master:7077 /opt/***/spark_jobs/test_spark_job.py']  
[2024-01-26, 06:43:18 UTC] {subprocess.py:86} INFO - Output:  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkContext: Running Spark version 3.5.0  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkContext: OS info Linux, 5.15.133.1-microsoft-standard-WSL2, amd64  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkContext: Java version 11.0.22  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO ResourceUtils:  
 ==============================================================  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO ResourceUtils: No custom resources configured for spark.driver.  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO ResourceUtils:  
 ==============================================================  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkContext: Submitted application: my_test_spark  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO ResourceProfile: Limiting resource is cpu  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO ResourceProfileManager: Added ResourceProfile id: 0  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SecurityManager: Changing view acls to: ***  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SecurityManager: Changing modify acls to: ***  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SecurityManager: Changing view acls groups to:  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SecurityManager: Changing modify acls groups to:  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SecurityManager: SecurityManager:  authentication disabled; ui acls disabled; users with view permissions: ***; groups with view permissions: EMPTY; users with modify permissions: ***; groups with modify permissions: EMPTY  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO Utils: Successfully started service 'sparkDriver' on port 44461.  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkEnv: Registering MapOutputTracker  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkEnv: Registering BlockManagerMaster  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkEnv: Registering BlockManagerMasterHeartbeat  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-4e264f5a-46c0-4ce3-9881-6467b5750e7b  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO SparkEnv: Registering OutputCommitCoordinator  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI  
[2024-01-26, 06:43:20 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-master:7077...  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO TransportClientFactory: Successfully created connection to spark-master/172.20.0.4:7077 after 18 ms (0 ms spent in bootstraps)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20240126064321-0003  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240126064321-0003/0 on worker-20240126062622-172.20.0.9-35479 (172.20.0.9:35479) with 2 core(s)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneSchedulerBackend: Granted executor ID app-20240126064321-0003/0 on hostPort 172.20.0.9:35479 with 2 core(s), 1024.0 MiB RAM  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20240126064321-0003/1 on worker-20240126062622-172.20.0.10-41101 (172.20.0.10:41101) with 2 core(s)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneSchedulerBackend: Granted executor ID app-20240126064321-0003/1 on hostPort 172.20.0.10:41101 with 2 core(s), 1024.0 MiB RAM  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 38041.  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO NettyBlockTransferService: Server created on cc02b800ce22:38041  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, cc02b800ce22, 38041, None)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO BlockManagerMasterEndpoint: Registering block manager cc02b800ce22:38041 with 434.4 MiB RAM, BlockManagerId(driver, cc02b800ce22, 38041, None)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, cc02b800ce22, 38041, None)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, cc02b800ce22, 38041, None)  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240126064321-0003/1 is now RUNNING  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20240126064321-0003/0 is now RUNNING  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.  
[2024-01-26, 06:43:21 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:21 INFO SharedState: Warehouse path is 'file:/tmp/***tmp3080o3il/spark-warehouse'.  
[2024-01-26, 06:43:22 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:22 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.20.0.10:58654) with ID 1,  ResourceProfileId 0  
[2024-01-26, 06:43:22 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:22 INFO StandaloneSchedulerBackend$StandaloneDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.20.0.9:45888) with ID 0,  ResourceProfileId 0  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO BlockManagerMasterEndpoint: Registering block manager 172.20.0.10:42685 with 434.4 MiB RAM, BlockManagerId(1, 172.20.0.10, 42685, None)  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO BlockManagerMasterEndpoint: Registering block manager 172.20.0.9:37703 with 434.4 MiB RAM, BlockManagerId(0, 172.20.0.9, 37703, None)  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO CodeGenerator: Code generated in 121.255744 ms  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO DAGScheduler: Got job 0 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO DAGScheduler: Final stage: ResultStage 0 (showString at NativeMethodAccessorImpl.java:0)  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO DAGScheduler: Parents of final stage: List()  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO DAGScheduler: Missing parents: List()  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 7.5 KiB, free 434.4 MiB)  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 3.8 KiB, free 434.4 MiB)  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on cc02b800ce22:38041 (size: 3.8 KiB, free: 434.4 MiB)  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1580  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0  
[2024-01-26, 06:43:23 UTC] {subprocess.py:93} INFO - 24/01/26 06:43:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (172.20.0.10, executor 1, partition 0, PROCESS_LOCAL, 7827 bytes)  

if I open stderr in the worker interface, I see the following logs:

Spark Executor Command: "/opt/bitnami/java/bin/java" "-cp" "/opt/bitnami/spark/conf/:/opt/bitnami/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44461" "-Djava.net.preferIPv6Addresses=false" "-XX:+IgnoreUnrecognizedVMOptions" "--add-opens=java.base/java.lang=ALL-UNNAMED" "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" "--add-opens=java.base/java.io=ALL-UNNAMED" "--add-opens=java.base/java.net=ALL-UNNAMED" "--add-opens=java.base/java.nio=ALL-UNNAMED" "--add-opens=java.base/java.util=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED" "--add-opens=java.base/sun.security.action=ALL-UNNAMED" "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED" "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" "-Djdk.reflect.useDirectMethodHandle=false" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@cc02b800ce22:44461" "--executor-id" "1" "--hostname" "172.20.0.10" "--cores" "2" "--app-id" "app-20240126064321-0003" "--worker-url" "spark://Worker@172.20.0.10:41101" "--resourceProfileId" "0"  
========================================  

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties  
24/01/26 06:43:21 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 334@17282e67d0e6  
24/01/26 06:43:21 INFO SignalUtils: Registering signal handler for TERM  
24/01/26 06:43:21 INFO SignalUtils: Registering signal handler for HUP  
24/01/26 06:43:21 INFO SignalUtils: Registering signal handler for INT  
24/01/26 06:43:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
24/01/26 06:43:22 INFO SecurityManager: Changing view acls to: spark,airflow  
24/01/26 06:43:22 INFO SecurityManager: Changing modify acls to: spark,airflow  
24/01/26 06:43:22 INFO SecurityManager: Changing view acls groups to:   
24/01/26 06:43:22 INFO SecurityManager: Changing modify acls groups to:   
24/01/26 06:43:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark, airflow; groups with view permissions: EMPTY; users with modify permissions: spark, airflow; groups with modify permissions: EMPTY  
24/01/26 06:43:22 INFO TransportClientFactory: Successfully created connection to cc02b800ce22/172.20.0.12:44461 after 48 ms (0 ms spent in bootstraps)  
24/01/26 06:43:22 INFO SecurityManager: Changing view acls to: spark,airflow  
24/01/26 06:43:22 INFO SecurityManager: Changing modify acls to: spark,airflow  
24/01/26 06:43:22 INFO SecurityManager: Changing view acls groups to:   
24/01/26 06:43:22 INFO SecurityManager: Changing modify acls groups to:   
24/01/26 06:43:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark, airflow; groups with view permissions: EMPTY; users with modify permissions: spark, airflow; groups with modify permissions: EMPTY  
24/01/26 06:43:22 INFO TransportClientFactory: Successfully created connection to cc02b800ce22/172.20.0.12:44461 after 1 ms (0 ms spent in bootstraps)  
24/01/26 06:43:22 INFO DiskBlockManager: Created local directory at /tmp/spark-3bba1eb1-1f77-4c8d-b8ab-5dcabf62b37c/executor-ae53daab-ff09-4598-858e-5b94bb829e80/blockmgr-8670dd9d-aee0-4876-a88a-06ea1400aa7c  
24/01/26 06:43:22 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB  
24/01/26 06:43:22 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@cc02b800ce22:44461  
24/01/26 06:43:22 INFO WorkerWatcher: Connecting to worker spark://Worker@172.20.0.10:41101  
24/01/26 06:43:22 INFO TransportClientFactory: Successfully created connection to /172.20.0.10:41101 after 2 ms (0 ms spent in bootstraps)  
24/01/26 06:43:22 INFO WorkerWatcher: Successfully connected to spark://Worker@172.20.0.10:41101  
24/01/26 06:43:22 INFO ResourceUtils:  
 ==============================================================  
24/01/26 06:43:22 INFO ResourceUtils: No custom resources configured for spark.executor.  
24/01/26 06:43:22 INFO ResourceUtils:  
 ==============================================================  
24/01/26 06:43:22 INFO CoarseGrainedExecutorBackend: Successfully registered with driver  
24/01/26 06:43:22 INFO Executor: Starting executor ID 1 on host 172.20.0.10  
24/01/26 06:43:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42685.  
24/01/26 06:43:23 INFO NettyBlockTransferService: Server created on 172.20.0.10:42685  
24/01/26 06:43:23 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy  
24/01/26 06:43:23 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 172.20.0.10, 42685, None)  
24/01/26 06:43:23 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 172.20.0.10, 42685, None)  
24/01/26 06:43:23 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 172.20.0.10, 42685, None)  
24/01/26 06:43:23 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''  
24/01/26 06:43:23 ERROR Inbox: Ignoring error  
java.io.EOFException  
    at java.base/java.io.DataInputStream.readFully(DataInputStream.java:203)  
    at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:602)  
    at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:558)  
    at org.apache.spark.scheduler.TaskDescription$.deserializeResources(TaskDescription.scala:150)  
    at org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:198)  
    at  
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:188)  
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)  
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)  
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)  
    at  
 org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)  
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)  
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)  
    at java.base/java.lang.Thread.run(Thread.java:840)  

I also noticed that if in the executed code we leave only the lines that receive the spark session configuration, then it works fine
image
image

also if I go directly into the spark container and run command spark-submit --master spark://spark-master:7077 test_spark_job.py , the task will complete successfully
image
image

Has anyone encountered this problem? I looked all over the internet and couldn't find the answer.
Here is my code link

Evgeny-Larin commented 5 months ago

The problem was the difference between the versions of spark in dockerfile and docker compose file It turns out that with this type of deployment, spark in airflow (installed by us using dockerfile) is used as a master for the spark cluster from the docker compose file I found information about this here: link