ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.04k stars 5.46k forks source link

[Core] After running script, the ray::IDLE processes do not stop #28199

Closed Alxe1 closed 1 year ago

Alxe1 commented 1 year ago

What happened + What you expected to happen

When finished running, the ray::IDLE processes do not stop automaticaly:

image

Versions / Dependencies

ray 2.0.0

Reproduction script

No

Issue Severity

No response

matthewdeng commented 1 year ago

Do you have a repro for this?

Alxe1 commented 1 year ago

Do you have a repro for this?

def train_func(config):
    batch_size = config.get("batch_size", 512)
    epochs = config.get("epochs", 5)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    with strategy.scope():
        multi_worker_model = DeepCross(
            user_num=config.get("user_num") + 1,
            item_num=config.get("item_num") + 1,
            user_item_dim=config.get("user_item_dim"),
            feature_embed_dim=config.get("feature_embed_dim"),
            embed_norm=config.get("embed_norm"),
            dnn_hidden_units=config.get("hidden_units"),
            dnn_dropout=config.get("dnn_dropout", 0.2),
            dnn_activation="relu",
            cross_num=config.get("cross_num", 3)
        )

        loss = tf.keras.losses.BinaryCrossentropy()
        optimizer = tf.keras.optimizers.Adam()
        multi_worker_model.compile(optimizer=optimizer, loss=loss, metrics=["AUC"])

    dataset = session.get_dataset_shard("train")

    def to_tf_dataset(dataset: ray.data.Dataset, batch_size: int):
        def to_tensor_iterator():
            for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32, drop_last=True):
                yield batch["x"], batch["is_click"]

        output_signature = (
            tf.TensorSpec(shape=(None, config.get("sparse_num")), dtype=tf.float32),
            tf.TensorSpec(shape=(None,), dtype=tf.float32)
        )
        tf_dataset = tf.data.Dataset.from_generator(to_tensor_iterator, output_signature=output_signature)

        return prepare_dataset_shard(tf_dataset)

    tf_dataset = to_tf_dataset(dataset=dataset, batch_size=batch_size)
    history = multi_worker_model.fit(tf_dataset, epochs=epochs, callbacks=[TrainReportCallback()], class_weight={0: 0.5, 1: 1})

    multi_worker_model.save("./tmp_dir", save_format="tf")
    ckpt = Checkpoint.from_directory("./tmp_dir")
    result = history.history
    session.report(result, checkpoint=ckpt) 
    return result

def train_test():
    config, dataset = preprocessing_data(dataset=ray_dataset, parquet_path=None)
    train_dataset, test_dataset = dataset.train_test_split(test_size=0.3)

    config.update({"user_item_dim": 32, "feature_embed_dim": 16, "embed_norm": 0.001, "hidden_units": [64, 32, 32],
                   "dnn_dropout": 0.2, "cross_num": 3, "epochs": 10, "batch_size": 512, "num_workers": 12, "use_gpu": False})

    num_workers = config.get("num_workers")
    use_gpu = config.get("use_gpu", False)

    trainer = TensorflowTrainer(
        train_loop_per_worker=train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        datasets={"train": train_dataset}
    )

    result = trainer.fit()
    print(f"result.metrics: {result.metrics}")
scottsun94 commented 1 year ago

cc: @jjyao Not sure if it's related to our discussion about workers. Ray core seems to keep idle workers alive as the same # of physical cores on a node.

jjyao commented 1 year ago

Yes, we will keep some idle workers alive for future tasks to avoid the worker startup overhead. Does this behavior work for you @Alxe1?

hora-anyscale commented 1 year ago

Per Triage Sync: Closing as no reporter response for 30 days