kestra-io / plugin-spark

Apache License 2.0
3 stars 2 forks source link

Pyspark tasks get instantly killed - can't run a Hello World on Spark plugin #27

Closed anna-geller closed 1 year ago

anna-geller commented 1 year ago

Issue description

Simple reproducers

DOCKER runner

id: spark
namespace: dev

tasks:
  - id: spark
    type: io.kestra.plugin.spark.PythonSubmit
    runner: DOCKER
    warningOnStdErr: false
    dockerOptions:
      image: tabulario/spark-iceberg
      # networkMode: host
      user: root
      entryPoint: 
      - /bin/sh
      - -c
    master: spark://host-docker.internal:8082
    mainScript: |
      from pyspark.sql import SparkSession
      spark = SparkSession.builder.appName("HelloWorldApp").getOrCreate()
      df = spark.createDataFrame([('Hello World',)], ['greeting'])
      df.show()
      spark.stop()

Error:

This stopped SparkContext was created at:
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)
The currently active SparkContext was created at:
(No active SparkContext.)

    at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:120)
    at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:114)
    at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:107)
23/07/31 11:59:12 INFO BlockManager: BlockManager stopped
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
23/07/31 11:59:12 INFO BlockManagerMaster: BlockManagerMaster stopped
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
23/07/31 11:59:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
23/07/31 11:59:12 INFO SparkContext: Successfully stopped SparkContext
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
23/07/31 11:59:12 INFO ShutdownHookManager: Shutdown hook called
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
23/07/31 11:59:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-9eed1eb2-7a61-4ee9-91cf-38e6cdddb782
    at py4j.Gateway.invoke(Gateway.java:238)
23/07/31 11:59:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-a99f8b9c-f1e7-44dd-92b5-78c555bdfe79
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
23/07/31 11:59:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-a99f8b9c-f1e7-44dd-92b5-78c555bdfe79/pyspark-9de8bf24-3aab-486d-843c-3c505b6c7176
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Command failed with code 1
anna-geller commented 1 year ago

You can use this to start a local spark cluster to reproduce: docker compose up

version: "3"

services:
  spark-iceberg:
    image: tabulario/spark-iceberg
    container_name: spark-iceberg
    ports:
      - 8888:8888
      - 8082:8080 # to avoid port conflict with Kestra
      - 10000:10000
      - 10001:10001
loicmathieu commented 1 year ago

Spark master is on port 7077 so you should declare this port in the docker-compose and use it in the task. Even with that, I didn't succeed in connecting to the master.

anna-geller commented 1 year ago

thanks for reproducing, let's focus on Athena then and we can keep this issue open for now 👍

loicmathieu commented 1 year ago

You should both open the 7077 port and if using Docker in your task set the networkMode to host. Doing both will make it work.

loicmathieu commented 1 year ago

The following flow works with the master port on the docker-compose exposed to 7077.

id: spark-submit
namespace: dev
tasks:
  - id: spark
    type: io.kestra.plugin.spark.PythonSubmit
    runner: DOCKER
    warningOnStdErr: false
    dockerOptions:
      image: tabulario/spark-iceberg
      networkMode: host
      user: root
      entryPoint: 
      - /bin/sh
      - -c
    master: spark://localhost:7077
    mainScript: |
      from pyspark.sql import SparkSession
      spark = SparkSession.builder.appName("HelloWorldApp").getOrCreate()
      df = spark.createDataFrame([('Hello World',)], ['greeting'])
      df.show()
      spark.stop()
anna-geller commented 1 year ago

works indeed! <3 thx so much

image