oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

When submit a script another process start locally in the head node #372

Open gbraes opened 10 months ago

gbraes commented 10 months ago

I am submitting this script to a ray cluster (ray 2.6.0 /raydp 1.5):

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import from_avro, to_avro

import requests
import json

configs={}   

import ray
import raydp

ray.init(address="auto")
sc = raydp.init_spark(app_name='RayDP stream Example', num_executors=2, executor_cores=2, executor_memory='4GB',configs=configs)

in a cluster created with docker-compose where there is an external redis. I have the head node and 3 independent workers.

when I submit my job in this way:

RAY_ADDRESS='http://ray-head:8265' ray job submit --working-dir . --no-wait  --  python3 simple_pyspark_borrar.py

and later a list the jobs:

RAY_ADDRESS='http://ray-head:8265' ray list jobs

I get 2 jobs:

image

I have seen that this job running in the Driver is related to the line where raydp.init_spark is executed.

What am I doing wrong?

The main issue is that I usually run stream processes using pyspark that when submitted I cannot kill completely because a process is running in the driver. Also, I am not sure if that means that pyspark is running locally in the head node and not in the cluster.

kira-lin commented 10 months ago

Hi @gbraes, Thanks for using RayDP! I'm afraid that I don't understand your issue very well. I've never used ray job API before. What's the expected behavior? Also I notice you're using Ray 2.6.0 and RayDP 1.5.0, but RayDP 1.5.0 does not provide support for Ray 2.6.0. Have you tried your script using ray client?

Also, I am not sure if that means that pyspark is running locally in the head node and not in the cluster.

You can visit Ray's dashboard to see if RayDP actors are distributed to all nodes. In addition, you can use placement group to ensure they are distributed, otherwise it might happen that executors are on the same node because it has enough resources