ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.17k stars 5.8k forks source link

[<Ray component: Ray Train] Non blocking reporting of the checkpoint to maximize the GPU utilization #48801

Open azayz opened 1 week ago

azayz commented 1 week ago

Description

Hello Ray team, my team and I are using ray for training, the model we save is of size 13Gb and it takes around 20min to upload to S3 storage, in the mean time GPU workers are sitting and not doing anything.

In order to maximize the GPU usage, we want to do this upload in the background or asynchronously.

What is the recommended ray way to do this? if it doesn't can you support this? if it's also not on the ray side, it's fine.

Below is a sample of our code:

s3_fs = s3fs.S3FileSystem(
    key=os.getenv('AWS_ACCESS_KEY_ID'),
    secret=os.getenv('AWS_SECRET_ACCESS_KEY'),
    endpoint_url=endpoint,
    client_kwargs=region_dict,
    max_concurrency=20,
)

custom_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(s3_fs)) in the train_func:

        time_start = time.time()
        save_deepspeed_model(trainer, ckpt_path)
        print(
            f"MIDASTOUCH: Files in the save path after custom save: {os.listdir(ckpt_path)}"
        )
        time_end = time.time()
        print(
            f"MIDASTOUCH:Time taken to save the model: {time_end - time_start} seconds"
        )

        # Report to train session
        checkpoint = Checkpoint.from_directory(tmpdir)
        print(
            "MIDASTOUCH:Reporting to train session/ Uploading the checkpoint to S-3"
        )
        time_start = time.time()
        print(f"Before reporting: {checkpoint.get_metadata()}")
        ray.train.report(metrics=metrics, checkpoint=checkpoint)

        # Add a barrier to ensure all workers finished reporting here
        trainer.strategy.barrier()
        time_end = time.time()

Thank you!

Use case

No response

justinvyu commented 1 week ago

cc @hongpeng-guo

Superskyyy commented 1 week ago

I believe the async checkpoint you can do out of band using torch abilities. And for uploading the artifact you can use an extra thread to do that that doesn't block the training loop. But ideally this can be provided as a generic Ray interface using the object store. To enable distributed checkpointing, merging and layered storages. I can propose a REP design on that since we already experimented a bit on this direction.

Superskyyy commented 1 week ago

Probably a best practice section somewhere in the docs is also helpful to Train users.

justinvyu commented 1 week ago

@Superskyyy That would be great! Maybe you could start with a quick sketch of your proposal as a github issue?

Superskyyy commented 6 days ago

@Superskyyy That would be great! Maybe you could start with a quick sketch of your proposal as a github issue?

Cool, you mean in the main Ray repo right? not in the REP repo.

justinvyu commented 6 days ago

@Superskyyy Yep, just in the Ray repo for now.