mlfoundations / open_clip

An open source implementation of CLIP.
Other
9.29k stars 923 forks source link

Horovod Error #791

Closed LuisBlanche closed 5 months ago

LuisBlanche commented 6 months ago

Context

Trying to finetune pretrained CLIP models on databricks using horovod

Using the following parameters :

accum_freq: 1
aug_cfg: {}
batch_size: 128
beta1: 0.9
beta2: 0.98
checkpoint_path: ./logs/2024_01_11-13_29_20-model_ViT-B-16-SigLIP-lr_5e-05-b_128-j_4-p_amp/checkpoints
coca_caption_loss_weight: 2.0
coca_contrastive_loss_weight: 1.0
copy_codebase: False
csv_caption_key: caption
csv_img_key: image_path
csv_separator: ,
dataset_resampled: False
dataset_type: auto
ddp_static_graph: False
debug: False
delete_previous_checkpoint: False
device: cuda:0
dist_backend: nccl
dist_url: env://
distill: False
distill_model: None
distill_pretrained: None
distributed: True
epochs: 20
epochs_cooldown: None
eps: 1e-06
force_custom_text: False
force_image_size: None
force_patch_dropout: None
force_quick_gelu: False
gather_with_grad: False
grad_checkpointing: True
grad_clip_norm: None
horovod: True
image_interpolation: None
image_mean: None
image_resize_mode: None
image_std: None
imagenet_v2: None
imagenet_val: None
local_loss: False
local_rank: 0
lock_image: False
lock_image_freeze_bn_stats: False
lock_image_unlocked_groups: 0
lock_text: False
lock_text_freeze_layer_norm: False
lock_text_unlocked_layers: 0
log_every_n_steps: 100
log_level: 20
log_local: False
log_path: ./logs/2024_01_11-13_29_20-model_ViT-B-16-SigLIP-lr_5e-05-b_128-j_4-p_amp/out.log
logs: ./logs/
lr: 5e-05
lr_cooldown_end: 0.0
lr_cooldown_power: 1.0
lr_scheduler: cosine
model: ViT-B-16-SigLIP
name: 2024_01_11-13_29_20-model_ViT-B-16-SigLIP-lr_5e-05-b_128-j_4-p_amp
no_set_device_rank: False
precision: amp
pretrained: webli
pretrained_image: False
rank: 0
remote_sync: s3://***/***/clip/training_sync
remote_sync_frequency: 180
remote_sync_protocol: s3
report_to: ['wandb']
resume: None
save_frequency: 1
save_most_recent: False
seed: 0
siglip: False
skip_scheduler: False
tensorboard: False
tensorboard_path:  
test_data: /tmp/CLIP/***/metadata/test.csv
torchcompile: False
torchscript: False
trace: False
train_data: /tmp/CLIP/***/metadata/train.csv
train_data_upsampling_factors: None
train_num_samples: None
use_bn_sync: False
use_bnb_linear: None
val_data: /tmp/CLIP/***/metadata/val.csv
val_frequency: 1
val_num_samples: None
wandb: False
wandb_notes: 
wandb_project_name: open-clip
warmup: 0
wd: 0.1
workers: 4
world_size: 1
zeroshot_frequency: 10

PROBLEM

I get the following runtime error when the code tries to instantiate the DistributedSampler :

File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/open_clip/training/data.py:564, in get_csv_dataset(args, preprocess_fn, is_train, epoch, tokenizer)
    561 num_samples = len(dataset)
    563 if args.distributed and is_train:
--> 564     sampler = DistributedSampler(dataset)

File /databricks/python/lib/python3.10/site-packages/torch/utils/data/distributed.py:68, in DistributedSampler.__init__(self, dataset, num_replicas, rank, shuffle, seed, drop_last)
     66     if not dist.is_available():
     67         raise RuntimeError("Requires distributed package to be available")
---> 68     num_replicas = dist.get_world_size()
     69 if rank is None:
     70     if not dist.is_available():

File /databricks/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py:1067, in get_world_size(group)
   1064 if _rank_not_in_group(group):
   1065     return -1
-> 1067 return _get_group_size(group)

File /databricks/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py:453, in _get_group_size(group)
    449 """
    450 Helper that gets a given group's world size.
    451 """
    452 if group is GroupMember.WORLD or group is None:
--> 453     default_pg = _get_default_group()
    454     return default_pg.size()
    455 return group.size()

File /databricks/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py:584, in _get_default_group()
    580 """
    581 Getting the default process group created by init_process_group
    582 """
    583 if not is_initialized():
--> 584     raise RuntimeError(
    585         "Default process group has not been initialized, "
    586         "please make sure to call init_process_group."
    587     )
    588 return GroupMember.WORLD

RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.

I can see that indeed when using horovod the init_process_group method is never called cf : https://github.com/mlfoundations/open_clip/blob/3ff1faf10b60be27252be7f6c84ce7c8c5e14ec8/src/training/distributed.py#L63-L114

We can see that it is only called when horovod is not used. I don't know horovod, nor distributed training well enough to know if this is the expected behavior !

rwightman commented 6 months ago

@LuisBlanche if you use horovod you can only use webdataset, otherwise you have to write your own DistributedSampler. We never found much benefit to using horovod over distributed torch training...

rwightman commented 6 months ago

hmm, if code is added to check the args.horovod in the csv dataset create fn, you could get the world details (rank & world_size) from hvd and then explicity pass to DistributedSampler so that it doesn't try to look them up from torch.dist

class DistributedSampler(Sampler[T_co]):
    r"""Sampler that restricts data loading to a subset of the dataset.

    It is especially useful in conjunction with
    :class:`torch.nn.parallel.DistributedDataParallel`. In such a case, each
    process can pass a :class:`~torch.utils.data.DistributedSampler` instance as a
    :class:`~torch.utils.data.DataLoader` sampler, and load a subset of the
    original dataset that is exclusive to it.

    .. note::
        Dataset is assumed to be of constant size and that any instance of it always
        returns the same elements in the same order.

    Args:
        dataset: Dataset used for sampling.
        num_replicas (int, optional): Number of processes participating in
            distributed training. By default, :attr:`world_size` is retrieved from the
            current distributed group.
        rank (int, optional): Rank of the current process within :attr:`num_replicas`.
            By default, :attr:`rank` is retrieved from the current distributed
            group.
        shuffle (bool, optional): If ``True`` (default), sampler will shuffle the
            indices.
        seed (int, optional): random seed used to shuffle the sampler if
            :attr:`shuffle=True`. This number should be identical across all
            processes in the distributed group. Default: ``0``.
        drop_last (bool, optional): if ``True``, then the sampler will drop the
            tail of the data to make it evenly divisible across the number of
            replicas. If ``False``, the sampler will add extra indices to make
            the data evenly divisible across the replicas. Default: ``False``.

    .. warning::
        In distributed mode, calling the :meth:`set_epoch` method at
        the beginning of each epoch **before** creating the :class:`DataLoader` iterator
        is necessary to make shuffling work properly across multiple epochs. Otherwise,
        the same ordering will be always used.

    Example::

        >>> # xdoctest: +SKIP
        >>> sampler = DistributedSampler(dataset) if is_distributed else None
        >>> loader = DataLoader(dataset, shuffle=(sampler is None),
        ...                     sampler=sampler)
        >>> for epoch in range(start_epoch, n_epochs):
        ...     if is_distributed:
        ...         sampler.set_epoch(epoch)
        ...     train(loader)
    """

    def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
                 rank: Optional[int] = None, shuffle: bool = True,
                 seed: int = 0, drop_last: bool = False) -> None:
LuisBlanche commented 6 months ago

THanks for the quick reply ! If i work with multiple GPU machine (only one node) for instance, will the code automatically use torch distributed training ? If I understand, on non-SLURM custer for instance, we would need to have a "WORLD_SIZE" environment variable preset ?

rwightman commented 6 months ago

https://github.com/mlfoundations/open_clip?tab=readme-ov-file#single-node if you use either the single or multi node (both are multi-gpu) torchrun commands in the README all those vars are set and the torch.dist functions work