mosaicml / streaming

A Data Streaming Library for Efficient Neural Network Training
https://streaming.docs.mosaicml.com
Apache License 2.0
1.03k stars 126 forks source link

GPU utilisation drop between epochs #643

Open rishabhm12 opened 3 months ago

rishabhm12 commented 3 months ago

Hi Team, I am training a model for retail usecase and I have three streaming data loaders which are interleaving (multitask setup). After the first epoch, I see gpu utilisation drop. How can I make the utilisation close to 100%? I observe the same behaviour if I load from local_disk too, by providing remote=None and local = local_path for all the three data loaders. When I just train with one data loader which loads from local_disk, then I do not observe this behaviour. Ideally, I want the data to be streamed from remote location (gcs in this case) with a util close to 100%. This leads to a lot of ideal time and high training cost.

Screenshot 2024-03-29 at 7 56 24 PM

I also tried playing with the predownload param but there's no improvement as such .

knighton commented 3 months ago

Cycling/interleaving/etc multiple StreamingDatasets/StreamingDataLoaders has the potential to result in complicated situations when it comes to coordination.

Instead, why not just use Streams? The sample space of a StreamingDataset is the concatenation of one or more Streams, which correspond to a serialized Streaming dataset directory, i.e. they are sub-datasets.

(Well, technically, if you use StreamingDataset you are already using Streams, SD just creates a single Stream implicitly behind the scenes and hands various arguments off to it -- if you don't provide Streams explicitly.)

from streaming import Stream, StreamingDataset

first = Stream(local=..., remote=...)
second = Stream(local=..., remote=...)
dataset = StreamingDataset(streams=[first, second], batch_size=...)
snarayan21 commented 3 months ago

@rishabhm12 You should also make sure to set persistent_workers=True in the DataLoader so that workers are not shut down after each epoch, and the workers' dataset instances will stay alive. More info here

EDIT: using the Composer launcher instead of TorchDistributor and not setting persistent_workers=True seems to address the problem in my testing.

rishabhm12 commented 3 months ago

@knighton the same behaviour persists with even a single data loader when streaming from remote. @snarayan21 will keeping the workers alive completely solve for the util drop or will only slightly improve the downtime (from 30 mins to 25 mins), have you tried by changing the argument? I was of the opinion, this behaviour is when forward + back prop << time taken to download shards from remote to local.

snarayan21 commented 3 months ago

@rishabhm12 This should solve the utilization drop if the issue was re-creating the worker StreamingDatasets. As I don't have your script, I don't know the exact improvement it will give you, but we've seen this in the past and it has addressed the problem. Also, since your job is continuing to run on the same set of GPUs/nodes, StreamingDataset is smart about partitioning shard downloads between epochs, so no shard files will have to be downloaded between epochs.

Let us know if setting persistent_workers=True helps, and if not, happy to discuss further. A look into your training script would be useful as well.

rishabhm12 commented 3 months ago

@snarayan21 we did try setting the persistent_workers=True but this did not help. Attaching the graph fyr, there's always a 30 min drop after each epoch ![Uploading Screenshot 2024-04-03 at 10.25.47 AM.png…]()

snarayan21 commented 3 months ago

@rishabhm12 Ah that's not good, mind sending over a version of your training script we can repro? Would love to get to the bottom of this. Also, I don't think your screenshot uploaded fully 😅

Thanks!

miguelalba96 commented 3 months ago

Same issue of utilisation here, I train on GCP using 4 nodes (n1-highmem-16) on GCP each with 2 (V100) GPUs , image

First 2 nodes are busy at 98-99% utilisation (non-stop) while the others fluctuate, so I assume the GPUs are waiting for data to be ready

GCP Vertex AI component uses GCSFuse treating the buckets as local file system, so I am not "streaming" the data

Batch size per device (GPU) is 64

what do you recommend? I was thinking using prefetch_factor > 2 or persistent_workers=True

snarayan21 commented 3 months ago

@miguelalba96 @rishabhm12 Can you make sure that the correct batch_size is being passed to both StreamingDataset and the DataLoader? This batch size should be per-device. If that's correct, then can you try setting the StreamingDataset's predownload parameter higher? All this while making sure that persistent_workers=True in your DataLoader.

Any minimal repro script either of you could provide so we can debug this would be greatly appreciated!

smilenaderi commented 3 months ago

I;m having the same issue too. @snarayan21 assigning persistant worker didn't help

rishabhm12 commented 3 months ago

@snarayan21 I am passing the local batch size to StreamingDatasets. I have shared the scripts with databricks team, they will get in touch with you

smilenaderi commented 3 months ago

@snarayan21 @knighton Here is my code. I have only one source of stream: After each epoch I have around 10 mins of idle state. Then it resumes to giving batches. After some point it crashes. It happens on multi gpu

def clean_previous_database(local):
#     try:
#         shutil.rmtree(local)
#     except:
#         pass
    clean_stale_shared_memory()

def make_get_batch(batch_size, remote, local):
    setup_google_cloud()
    clean_previous_database(local)
    ds = StreamingDataset(remote=remote, local=local, batch_size=batch_size)
    dl = DataLoader(ds, batch_size=batch_size, collate_fn=lambda x: x)
    it = iter(dl)

    def f():
        return next(it)
    def clean():
        clean_previous_database(local)
    return f, ds, clean, dl
class DataAcess():

    def __init__(self, device_id, batch_size, dataset_name, table_name,persistent_workers=False,num_workers=0):
        print(f'Data Access Loading Started on GPU {device_id}')

        self.get_batch, self.ds, self.clean, self.dl = mk_get_batch(dataset_name, table_name,batch_size=batch_size)
#         Define your dataloader:
#         self.dl = DataLoader(self.ds, batch_size=batch_size, collate_fn=lambda x: x, persistent_workers=persistent_workers,num_workers=num_workers)
        print(f'Data Access Loading Finished on GPU {device_id}')

def training_loop(dist, rank):

    start_time = time.time()
    device_id = rank % torch.cuda.device_count()
    master_process = rank == 0
    batch_size = 64
    NUM_EPOCH = 10
    tensor_max_len = 300
    num_workers = 1

    dataAccess = DataAcess(device_id,batch_size, dataset_name, table_name)
    print(f' Data Access Loading Time: {time.time() - start_time} rank {rank} ')

    start_time = time.time()
    query_count = 0

    for epoch in range(NUM_EPOCH):
        with tqdm(desc = f'EPOCH {epoch} QPS on master GPU ', disable=not master_process, unit=" item") as pbar:

            for batch in dataAccess.dl:
                tensor = batch_to_tensor(batch, tensor_max_len).cuda(device_id)
                query_count += tensor.size(0)
                len_batch = tensor.size(0)
                pbar.update(len_batch)
            dataAccess.clean()
        print(f'\nEPOCH {epoch} QPS on GPU {rank}  : {query_count / (time.time() - start_time)}\n')
rishabhm12 commented 3 months ago

And I think wait between epochs is also batch_size dependent. The higher the batch_size, higher is the wait time

smilenaderi commented 3 months ago

@rishabhm12 Yes, and it only happens on multi GPU. on single gpu it is ok

I'm using AMD ROCM rccl

snarayan21 commented 3 months ago

@rishabhm12 @smilenaderi Are both of you using pre-processing functions for each sample/batch before training? Also, how big are your batch sizes and epoch sizes?

snarayan21 commented 3 months ago

@miguelalba96 In the past we've seen that treating GCSFuse as "local" can be slow. Have you tried treating it as remote, or moving your data to local disk?

rishabhm12 commented 3 months ago

@snarayan21 yeah there's lite preprocessing happening, basically a lookup (O(1)) and converting np arr to torch tensors. local_batch_size in my case is 512 and global is 4096, I train for 14 epochs each epoch having ~24300 steps

Matagi1996 commented 3 months ago

You could log your actual disk read/write speed and see if your Dataloaders are IO bound, i had this issue as well when loading big images from local drive while also croping them (thowing alot of stuff away right away) while still wanting big batch sizes. Increesing Numbers of workers even decreesed read MB/Sec because (i guess) the head of my harddrive now had to jump between locations.

miguelalba96 commented 3 months ago

@miguelalba96 In the past we've seen that treating GCSFuse as "local" can be slow. Have you tried treating it as remote, or moving your data to local disk?

I transferred the data locally (1.1TB) to the instance and the problem of low GPU utilisation was solved. It's cheaper to pay the extra for TB of SSD storage than GPU hours

snarayan21 commented 3 months ago

@miguelalba96 some things you could try, given that local disk works well instead of FUSE-mounted:

let me know if that helps!

snarayan21 commented 3 months ago

I am curious, what launchers are people using? I have reproduced the issue of low utilization between epochs when using TorchDistributor, but the issue goes away with the Composer launcher.