oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

how to init raydp resource #408

Open yasaorder opened 1 month ago

yasaorder commented 1 month ago

My env is python3.9, raydp1.6, java 8, pyspark 3.1.1 when i run the code below, i got the warning below. How can I init the spark resource? And if it is nessary to install pyspark package as set some env variable like scala or java or spark home.

WARNING:
2024-05-23 21:36:07,404 INFO worker.py:1519 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(pid=370364) 
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,467 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,471 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,485 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,502 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(pid=370363) 
24/05/23 17:29:53 WARN Utils: Your hostname, whu-All-Series resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface eno1)
24/05/23 17:29:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 17:29:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

24/05/23 17:30:17 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/23 17:30:32 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/23 17:30:47 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

My code is below:

CODE:
import ray
ray.init()

import raydp

spark = raydp.init_spark(app_name='RayDP Example 2',
                         num_executors=4,
                         executor_cores=4,
                         executor_memory='2GB')

# # normal data processesing with Spark
# df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
# df.show()
# word_count = df.groupBy('word').count()
# word_count.show()
#
# # stop the spark cluster
# raydp.stop_spark()

from pyspark.sql.functions import col
df = spark.range(1, 1000)
# calculate z = x + 2y + 1000
df = df.withColumn("x", col("id")*2)\
  .withColumn("y", col("id") + 200)\
  .withColumn("z", col("x") + 2*col("y") + 1000)

from raydp.utils import random_split
train_df, test_df = random_split(df, [0.7, 0.3])

# PyTorch Code
import torch
class LinearModel(torch.nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = torch.nn.Linear(2, 1)

    def forward(self, x, y):
        x = torch.cat([x, y], dim=1)
        return self.linear(x)

model = LinearModel()
optimizer = torch.optim.Adam(model.parameters())
loss_fn = torch.nn.MSELoss()

def lr_scheduler_creator(optimizer, config):
    return torch.optim.lr_scheduler.MultiStepLR(
      optimizer, milestones=[150, 250, 350], gamma=0.1)

# You can use the RayDP Estimator API or libraries like Ray Train for distributed training.
from raydp.torch import TorchEstimator
estimator = TorchEstimator(
  num_workers = 2,
  model = model,
  optimizer = optimizer,
  loss = loss_fn,
  lr_scheduler_creator=lr_scheduler_creator,
  feature_columns = ["x", "y"],
  label_column = "z",
  batch_size = 1000,
  num_epochs = 2
)
# raydp.torch.TorchEstimator
estimator.fit_on_spark(train_df, test_df)

pytorch_model = estimator.get_model()

estimator.shutdown()
yasaorder commented 1 month ago

exctually, I mean if it is nessary to install pyspark-bin-hadoop like this:https://spark.apache.org/docs/3.3.1/api/python/getting_started/install.html. Or I just need to install raydp and java.