ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.3k stars 5.63k forks source link

Ray air train model do not stop #28095

Closed Alxe1 closed 2 years ago

Alxe1 commented 2 years ago

What happened + What you expected to happen

I trained a model using ray air, but it is always running and don't stop:

== Status ==
Current time: 2022-08-25 17:24:04 (running for 00:00:13.24)
Memory usage on this node: 30.0/61.5 GiB
Using FIFO scheduling algorithm.
Resources requested: 13.0/16 CPUs, 0/0 GPUs, 0.0/50.79 GiB heap, 0.0/30.0 GiB objects
Result logdir: /home/hadoop/ray_results/TensorflowTrainer_2022-08-25_17-23-51
Number of trials: 1/1 (1 RUNNING)
+-------------------------------+----------+------------------+
| Trial name                    | status   | loc              |
|-------------------------------+----------+------------------|
| TensorflowTrainer_a1da1_00000 | RUNNING  | xxx:32261 |
+-------------------------------+----------+------------------+

      1/Unknown - 5s 5s/step - loss: 1.0575 - auc: 0.5791
      1/Unknown - 5s 5s/step - loss: 1.0575 - auc: 0.5791
      2/Unknown - 5s 56ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 53ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 56ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 57ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 56ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 54ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 54ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 61ms/step - loss: 1.0361 - auc: 0.4868
      2/Unknown - 5s 55ms/step - loss: 1.0361 - auc: 0.4868
     67/Unknown - 8s 43ms/step - loss: 0.5656 - auc: 0.7668
== Status ==ker pid=4304, ip=10.0.10.246) 
Current time: 2022-08-25 17:24:09 (running for 00:00:18.24)
Memory usage on this node: 30.6/61.5 GiB
Using FIFO scheduling algorithm.
Resources requested: 13.0/16 CPUs, 0/0 GPUs, 0.0/50.79 GiB heap, 0.0/30.0 GiB objects
Result logdir: /home/hadoop/ray_results/TensorflowTrainer_2022-08-25_17-23-51
Number of trials: 1/1 (1 RUNNING)
+-------------------------------+----------+------------------+
| Trial name                    | status   | loc              |
|-------------------------------+----------+------------------|
| TensorflowTrainer_a1da1_00000 | RUNNING  | xxxxx:32261 |
+-------------------------------+----------+------------------+

    115/Unknown - 10s 44ms/step - loss: 0.4112 - auc: 0.8613
    178/Unknown - 13s 44ms/step - loss: 0.3027 - auc: 0.9120 auc: 0.8613
    182/Unknown - 13s 44ms/step - loss: 0.2954 - auc: 0.9153 auc: 0.9134
== Status ==ker pid=4304, ip=10.0.10.246) 
Current time: 2022-08-25 17:24:14 (running for 00:00:23.24)
Memory usage on this node: 30.7/61.5 GiB
Using FIFO scheduling algorithm.
Resources requested: 13.0/16 CPUs, 0/0 GPUs, 0.0/50.79 GiB heap, 0.0/30.0 GiB objects
Result logdir: /home/hadoop/ray_results/TensorflowTrainer_2022-08-25_17-23-51
Number of trials: 1/1 (1 RUNNING)
+-------------------------------+----------+------------------+
| Trial name                    | status   | loc              |
|-------------------------------+----------+------------------|
| TensorflowTrainer_a1da1_00000 | RUNNING  | xxxxx:32261 |
+-------------------------------+----------+------------------+

    201/Unknown - 14s 44ms/step - loss: 0.2740 - auc: 0.9247
== Status ==ker pid=32700) p=10.0.10.246) 
Current time: 2022-08-25 17:24:19 (running for 00:00:28.24)
Memory usage on this node: 30.7/61.5 GiB
Using FIFO scheduling algorithm.
Resources requested: 13.0/16 CPUs, 0/0 GPUs, 0.0/50.79 GiB heap, 0.0/30.0 GiB objects
Result logdir: /home/hadoop/ray_results/TensorflowTrainer_2022-08-25_17-23-51
Number of trials: 1/1 (1 RUNNING)
+-------------------------------+----------+------------------+
| Trial name                    | status   | loc              |
|-------------------------------+----------+------------------|
| TensorflowTrainer_a1da1_00000 | RUNNING  | xxxxx:32261 |
+-------------------------------+----------+------------------+

== Status ==
Current time: 2022-08-25 17:24:24 (running for 00:00:33.25)
Memory usage on this node: 30.7/61.5 GiB
Using FIFO scheduling algorithm.
Resources requested: 13.0/16 CPUs, 0/0 GPUs, 0.0/50.79 GiB heap, 0.0/30.0 GiB objects
Result logdir: /home/hadoop/ray_results/TensorflowTrainer_2022-08-25_17-23-51
Number of trials: 1/1 (1 RUNNING)
+-------------------------------+----------+------------------+
| Trial name                    | status   | loc              |
|-------------------------------+----------+------------------|
| TensorflowTrainer_a1da1_00000 | RUNNING  | xxxxx:32261 |
+-------------------------------+----------+------------------+

Versions / Dependencies

ray 2.0.0 tensorflow 2.8.0 python 3.7.10

Reproduction script

# -*- coding: utf-8 -*-
import datetime

from pyspark import SparkConf, SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import QuantileDiscretizer, StringIndexer
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer, Dense, Dropout, BatchNormalization, Embedding
from tensorflow.keras.regularizers import l2

import ray
from ray.air import session, Checkpoint
from ray.air.config import ScalingConfig
from ray.air.callbacks.keras import Callback
from ray.data.preprocessors import OneHotEncoder, Concatenator
from ray.train.tensorflow import prepare_dataset_shard
from ray.train.tensorflow import TensorflowTrainer

DEEPCROSS_MODEL_PATH = "/models/deepcross_model"
PARQUET_PATH = "s3://my-bucket/tmp_train"

class DNNLayer(Layer):
    def __init__(self, hidden_units, activation="relu", dropout=0.2, use_batch_norm=False):
        super(DNNLayer, self).__init__()
        self.hidden_units = hidden_units
        self.activation = activation
        self.dropout = dropout
        self.use_batch_norm = use_batch_norm
        self.batch_norm = BatchNormalization()

        self._dropout = Dropout(dropout)

        self.nets = [Dense(units=units, activation=activation) for units in hidden_units]

    def call(self, inputs, **kwargs):
        x = inputs
        for i, net in enumerate(self.nets):
            x = net(x)
            if i < len(self.nets) - 1:
                x = self._dropout(x)
        if self.use_batch_norm:
            x = self.batch_norm(x)
        return x

    def get_config(self):
        config = super().get_config()
        config.update({"hidden_units": self.hidden_units,
                       "activation": self.activation,
                       "dropout": self.dropout,
                       "use_batch_norm": self.use_batch_norm})
        return config

class CrossLayer(Layer):
    def __init__(self, cross_num=3):

        super(CrossLayer, self).__init__()
        self.cross_num = cross_num

    def build(self, input_shape):
        self.cross_weight_list = [self.add_weight(name="w_{}".format(i),
                                                  shape=(input_shape[-1], 1),
                                                  initializer="random_normal"
                                                  )
                                  for i in range(self.cross_num)]
        self.cross_bias = [self.add_weight(name="bias_{}".format(i),
                                           shape=(input_shape[-1], 1),
                                           initializer="random_normal"
                                           )
                           for i in range(self.cross_num)]

    def call(self, inputs, **kwargs):
        # inputs: (batch, dim)
        x0 = tf.expand_dims(inputs, axis=-1)  # (batch, dim, 1)
        xl = x0
        for i in range(self.cross_num):
            _xl = tf.tensordot(xl, self.cross_weight_list[i], axes=[1, 0])  # batch, 1, 1
            xl = tf.matmul(x0, _xl) + self.cross_bias[i] + xl  # batch, dim, 1
        xl = tf.squeeze(xl, axis=-1)  # batch, dim
        return xl

    def get_config(self):
        config = super().get_config()
        config.update({"cross_num": self.cross_num})
        return config

class DeepCross(Model):
    def __init__(self,
                 user_num,
                 item_num,
                 user_item_dim,
                 feature_embed_dim,
                 embed_norm,
                 dnn_hidden_units,
                 dnn_activation="relu",
                 dnn_dropout=0.2,
                 cross_num=3):

        super(DeepCross, self).__init__()
        self.user_num = user_num
        self.item_num = item_num
        self.user_item_dim = user_item_dim
        self.feature_embed_dim = feature_embed_dim
        self.embed_norm = embed_norm
        self.dnn_hidden_units = dnn_hidden_units
        self.dnn_activation = dnn_activation
        self.dnn_dropout = dnn_dropout
        self.cross_num = cross_num

        self.user_embedding = Embedding(input_dim=user_num,
                                        output_dim=user_item_dim,
                                        embeddings_initializer="random_normal",
                                        embeddings_regularizer=l2(embed_norm),
                                        input_length=1)
        self.item_embedding = Embedding(input_dim=item_num,
                                        output_dim=user_item_dim,
                                        embeddings_initializer="random_normal",
                                        embeddings_regularizer=l2(embed_norm),
                                        input_length=1)

        self.dnn_layer = DNNLayer(hidden_units=dnn_hidden_units, activation=dnn_activation, dropout=dnn_dropout)
        self.cross_layer = CrossLayer(cross_num=cross_num)
        self.dense = Dense(1)

    def call(self, inputs, training=None, mask=None):
        user_embed = self.user_embedding(inputs[:, 0])
        item_embed = self.item_embedding(inputs[:, 1])

        sparse_feature = inputs[:, 2:]

        input_features = tf.concat([user_embed, item_embed, sparse_feature], axis=-1)

        dnn_output = self.dnn_layer(input_features)
        cross_output = self.cross_layer(input_features)
        output = tf.concat([dnn_output, cross_output], axis=-1)
        output = self.dense(output)

        output = tf.nn.sigmoid(output)
        return output

    def get_config(self):
        configs = super(DeepCross, self).get_config()
        configs.update({
            "user_num": self.user_num,
            "item_num": self.item_num,
            "user_item_dim": self.user_item_dim,
            "feature_embed_dim": self.feature_embed_dim,
            "dnn_hidden_units": self.dnn_hidden_units,
            "dnn_activation": self.dnn_activation,
            "dnn_dropout": self.dnn_dropout,
            "cross_num": self.cross_num
        })
        return configs

def train_func(config):
    batch_size = config.get("batch_size", 128)
    epochs = config.get("epochs", 3)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        multi_worker_model = DeepCross(
            user_num=config.get("user_num"),
            item_num=config.get("item_num"),
            user_item_dim=config.get("user_item_dim"),
            feature_embed_dim=config.get("feature_embed_dim"),
            embed_norm=config.get("embed_norm"),
            dnn_hidden_units=config.get("hidden_units")
        )
        loss = tf.keras.losses.BinaryCrossentropy()
        optimizer = tf.keras.optimizers.Adam()
        multi_worker_model.compile(optimizer=optimizer, loss=loss, metrics=["AUC"])

    dataset = session.get_dataset_shard("train")

    def to_tf_dataset(dataset: ray.data.Dataset, batch_size: int):
        def to_tensor_iterator():
            for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32):
                yield batch["x"], batch["is_click"]

        output_signature = (
            tf.TensorSpec(shape=(None, config.get("sparse_num")), dtype=tf.float32),
            tf.TensorSpec(shape=(None, ), dtype=tf.int32)
        )
        tf_dataset = tf.data.Dataset.from_generator(to_tensor_iterator, output_signature=output_signature)

        return prepare_dataset_shard(tf_dataset)

    tf_dataset = to_tf_dataset(dataset=dataset, batch_size=batch_size)

    history = multi_worker_model.fit(tf_dataset, epochs=epochs, callbacks=[Callback()])

    multi_worker_model.save("./multi_worker_model", save_format="tf")

    ckpt = Checkpoint.from_directory("./multi_worker_model")
    result = history.history
    session.report(result, checkpoint=ckpt)

    return result

def preprocessing_data(parquet_path):
    ray_dataset = ray.data.read_parquet(parquet_path)
    print("ray dataset: {}".format(ray_dataset.take(1)))

    bucket_features = ["gender", "b_age", "room_type_id", "room_channel", "follow_count_bucket"]

    cate_num = len(bucket_features) + 2
    max_index = ray_dataset.max(["uid_index", "vid_index"])
    uid_num = max_index["max(uid_index)"]
    vid_num = max_index["max(vid_index)"]

    one_hot_encoder = OneHotEncoder(columns=bucket_features)
    one_hot_encoder.fit(ray_dataset)
    transformed_dataset = one_hot_encoder.transform(ray_dataset)

    sparse_num = len(transformed_dataset.take(1)[0])

    # Concatenate
    concatenator = Concatenator(exclude=["is_click"], output_column_name="x")
    transformed_dataset = concatenator.fit_transform(transformed_dataset)
    print(transformed_dataset.take(1))

    config = {"user_num": int(uid_num), "item_num": int(vid_num),
              "cate_num": int(cate_num), "sparse_num": int(sparse_num-1)} 
    print(config)

    return config, transformed_dataset

def train_test():
    config, dataset = preprocessing_data(PARQUET_PATH)
    config.update({"user_item_dim": 32, "feature_embed_dim": 16, "embed_norm": 0.001, "hidden_units": [64, 32, 32]})

    num_workers = 12
    use_gpu = False

    trainer = TensorflowTrainer(
        train_loop_per_worker=train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        datasets={"train": dataset}
    )

    result = trainer.fit()
    print(result.metrics)
    print(result.checkpoint)
    print(result.best_checkpoints)

if __name__ == '__main__':
    train_test()

Issue Severity

No response

xwjiang2010 commented 2 years ago

@Alxe1 How long does it hang or do you wait till you killed the program? Did you get a chance to see the ray dashboard? Wonder if any work is actually being done. I notice that from the console, there is not even one training result reported back yet. So wonder if anything is wrong at the tensorflow layer.

Alxe1 commented 2 years ago

@Alxe1 How long does it hang or do you wait till you killed the program? Did you get a chance to see the ray dashboard? Wonder if any work is actually being done. I notice that from the console, there is not even one training result reported back yet. So wonder if anything is wrong at the tensorflow layer.

Yes, it hang a long time, and I use a small dataset to test it, it also hang a long time and can not stop too. But when I put the small dataset in train_func like:

def train_func(config):
    # -------------------------PUT DATASET HERE--------------------------
    config, dataset = preprocessing_data(PARQUET_PATH)
    config.update({"user_item_dim": 32, "feature_embed_dim": 16, "embed_norm": 0.001, "hidden_units": [64, 32, 32]})
    # ------------------------------------------------------------------------

    batch_size = config.get("batch_size", 1024)
    epochs = config.get("epochs", 3)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

It works! It's so weired. But when I increase the dataset to bigger, it also hang a long time. The resources are:

======== Autoscaler status: 2022-08-26 12:04:02.317127 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_2fa56f09e075d36c8130b048f6e84530c73dba93419c43c9590ef108
 1 node_9e5595a0042e88dc330c96f706c3ab99e52229a4a4cdbda44515369b
 1 node_c8ba399fb7cf0e5c4fa17d5a9122b76e017e5e2ad13a4d3feed3c07d
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 9.0/16.0 CPU (9.0 used of 9.0 reserved in placement groups)
 0.00/50.793 GiB memory
 0.09/30.000 GiB object_store_memory

Demands:
 (no resource demands)

And the dashboard:

image

xwjiang2010 commented 2 years ago

Thank you so much! Could you also share me with the parquet file so that I can run it and debug?

matthewdeng commented 2 years ago

@Alxe1 are you still running into this issue? If so can you provide a repro?

Alxe1 commented 2 years ago

@Alxe1 are you still running into this issue? If so can you provide a repro?

It's not appeared so far when I restart Ray cluster. But the processes of ray::IDE sometimes still are there when the program is done no matter what I run. #28199

richardliaw commented 2 years ago

Awesome! will close for now.