ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.04k stars 5.59k forks source link

StreamSplitDataIterator(epoch=-1, split=0) blocked waiting on other clients for more than 30s. #42008

Open afogarty85 opened 9 months ago

afogarty85 commented 9 months ago

What happened + What you expected to happen

Reopening this issue: https://github.com/ray-project/ray/issues/41973. I have included additional comparative scripts where quite literally the only difference between the two is use of ray data.

There are perhaps several bugs available to the reproducible script:

  1. Jobs wont start training when using multiple workers, likely owing to the complaints by the StreamSplitDataIterator. The data / model that I am using is trivially small (alpaca) but its unclear what is delaying training. The CPUs sit around 20% usage for 30 minutes until timeout.

    (SplitCoordinator pid=2108, ip=10.0.0.18) StreamSplitDataIterator(epoch=-1, split=0) blocked waiting on other clients for more than 30s. All clients must read from the DataIterator splits at the same time. This warning will not be printed again for this epoch.
    (RayTrainWorker pid=1856, ip=10.0.0.21) 
    0%|          | 0/38000 [00:00<?, ?it/s] [repeated 7x across cluster]
    (RayTrainWorker pid=2300) [E ProcessGroupNCCL.cpp:828] [Rank 7] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=26, OpType=BROADCAST, Timeout(ms)=1800000) ran for 1800251 milliseconds before timing out.
  2. This is perhaps a deepspeed issue when trying to use "auto" parameters, but worth mentioning and only seems to cause problems with zero 3 rather than 2. Edit: I am not sure what component is to blame here, but the world size is not being communicated to Deepspeed. Here it thinks the world size is 1 with zero 3.

    AssertionError: Check batch related parameters. train_batch_size is not equal to micro_batch_per_gpu * gradient_acc_step * world_size 32 != 4 * 1 * 1
  3. If no function is passed to collate_fn in iter_torch_batches -- we get this error. Interestingly, an empty function passes this check. I could get this far training on a single worker.

TypeError: can't convert np.ndarray of type numpy.str_. The only supported types are: float64, float32, float16, complex64, complex128, int64, int32, int16, int8, uint8, and bool.

Versions / Dependencies

ray[air]==2.8.1
trl==0.7.4
deepspeed==0.12.5
accelerate==0.25.0
transformers==4.36.1

Reproduction script

Multi-Worker Working Example

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
)
import torch
import ray
import numpy as np
from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import load_dataset, interleave_datasets
from azureml.core import Run
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback
from ray.train.torch import TorchTrainer, TorchConfig
from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig, Checkpoint
import os
torch.backends.cuda.matmul.allow_tf32 = True

def collate_fn(examples):
    return examples

def train_func(config):
    # tf32
    torch.backends.cuda.matmul.allow_tf32 = True

    deepspeed = {
        "fp16": {
            "enabled": "auto"
        },
        "bf16": {
            "enabled": "auto"
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "sub_group_size": 1e9,
            "reduce_bucket_size": 5e8,
            "stage3_prefetch_bucket_size": 5e8,
            "stage3_param_persistence_threshold": 1e6,
            "stage3_max_live_parameters": 1e9,
            "stage3_max_reuse_distance": 1e9,
            "stage3_gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False
    }

    tokenizer = AutoTokenizer.from_pretrained(
        "facebook/opt-350m",
        add_eos_token=False,
        add_bos_token=False,
        use_fast=True,
    )
    tokenizer.pad_token = tokenizer.eos_token

    train_ds_iterable = load_dataset("tatsu-lab/alpaca", split="train")

    def formatting_func(example):
        text = f"{example['instruction']}\n {example['text']}"
        return text

    args = TrainingArguments(
            logging_steps=1,
            save_strategy='steps',
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            gradient_checkpointing=False,
            gradient_accumulation_steps=1,
            learning_rate=5e-4,
            max_steps=38000,
            weight_decay=0.01,
            push_to_hub=False,
            save_steps=1000,
            report_to='none',
            lr_scheduler_type="constant",
            output_dir="/mnt/mypath",
            fp16=False,
            bf16=False,
            tf32=False,
            #deepspeed=deepspeed,           
        )

    # SFT
    trainer = SFTTrainer(
        model="facebook/opt-350m",
        model_init_kwargs={
        "torch_dtype": "auto",
        "trust_remote_code": True,
    },
        tokenizer=tokenizer,
        train_dataset=train_ds_iterable,
        formatting_func=formatting_func,
        max_seq_length=2048,
        neftune_noise_alpha=5,
        packing=True,
        args=args,
    )

    trainer.add_callback(RayTrainReportCallback())

    trainer = prepare_trainer(trainer)

    print("Starting training")
    trainer.train()

batch_size = 2

# trainer
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=3, use_gpu=True),
)

result = trainer.fit()

Multi-Worker Failing Example

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
)
import torch
import ray
import numpy as np
from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import load_dataset, interleave_datasets
from azureml.core import Run
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback
from ray.train.torch import TorchTrainer, TorchConfig
from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig, Checkpoint
import os
torch.backends.cuda.matmul.allow_tf32 = True

def collate_fn(examples):
    return examples

def train_func(config):
    # tf32
    torch.backends.cuda.matmul.allow_tf32 = True

    deepspeed = {
        "fp16": {
            "enabled": "auto"
        },
        "bf16": {
            "enabled": "auto"
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "sub_group_size": 1e9,
            "reduce_bucket_size": 5e8,
            "stage3_prefetch_bucket_size": 5e8,
            "stage3_param_persistence_threshold": 1e6,
            "stage3_max_live_parameters": 1e9,
            "stage3_max_reuse_distance": 1e9,
            "stage3_gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False
    }

    tokenizer = AutoTokenizer.from_pretrained(
        "facebook/opt-350m",
        add_eos_token=False,
        add_bos_token=False,
        use_fast=True,
    )
    tokenizer.pad_token = tokenizer.eos_token

    train_ds = ray.train.get_dataset_shard("train")
    train_ds_iterable = train_ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn, prefetch_batches=1,)

    def formatting_func(example):
        text = f"{example['instruction']}\n {example['text']}"
        return text

    args = TrainingArguments(
            logging_steps=1,
            save_strategy='steps',
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            gradient_checkpointing=False,
            gradient_accumulation_steps=1,
            learning_rate=5e-4,
            max_steps=38000,
            weight_decay=0.01,
            push_to_hub=False,
            save_steps=1000,
            report_to='none',
            lr_scheduler_type="constant",
            output_dir="/mnt/mypath",
            fp16=False,
            bf16=False,
            tf32=False,
            #deepspeed=deepspeed,           
        )

    # SFT
    trainer = SFTTrainer(
        model="facebook/opt-350m",
        model_init_kwargs={
        "torch_dtype": "auto",
        "trust_remote_code": True,
    },
        tokenizer=tokenizer,
        train_dataset=train_ds_iterable,
        formatting_func=formatting_func,
        max_seq_length=2048,
        neftune_noise_alpha=5,
        packing=True,
        args=args,
    )

    trainer.add_callback(RayTrainReportCallback())

    trainer = prepare_trainer(trainer)

    print("Starting training")
    trainer.train()

# data
train_ds = load_dataset("tatsu-lab/alpaca", split="train")
ray_ds = ray.data.from_huggingface(train_ds)

batch_size = 2

# trainer
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=3, use_gpu=True),
    datasets={
        "train": ray_ds,
    },
    dataset_config=DataConfig(datasets_to_split=["train"]),
)

result = trainer.fit()

Issue Severity

Low: It annoys or frustrates me.

raulchen commented 8 months ago

For (1), the warning message is not the cause, it's saying that some training worker(s) are not consuming data. It's worth checking the healthiness of the training workers. E.g., if they are still alive, and their stack traces. For (2), cc @woshiyyya to confirm if this is an issue from Ray Train. For (3), it should be duplicated with https://github.com/ray-project/ray/issues/40960

matthewdeng commented 8 months ago

For (2), I believe the root issue is https://github.com/microsoft/DeepSpeed/issues/3228, though it's not clear why the issue is happening again.

jieguangzhou commented 6 months ago

My cause of this problem may be that when trl uses packing, it will generate a dynamic length dataset ConstantLengthDataset, which will lead to inconsistent data lengths on different nodes. Hope that can help you.

You can use packing=False or pass the dataset directly to the training func

genesis-jamin commented 6 months ago

@afogarty85 were you able to resolve the NCCL timeout error?

SharlynOUO commented 3 weeks ago

Is there anyone solved the first problem yet? I have the same problem and I don't know how to solve it.


code

#!/usr/bin/env python
# coding: utf-8

# set up ray env

# In[1]:
import ray
import os 
os.environ['RAY_AIR_RICH_LAYOUT'] = '1'
ray.init(runtime_env={"env_vars": {"RAY_AIR_RICH_LAYOUT": "1"}})

# In[2]:
ray.available_resources()
# In[5]:
import ray.data as data
ray_datasets = data.read_csv("/opt/data/train.csv").train_test_split(test_size=0.25)
ray_datasets = {"train": ray_datasets[0], "validation": ray_datasets[1]}
ray_datasets

# In[6]:

model_name = "openai-community/gpt2"
# In[7]:

import numpy as np
from transformers import GPT2Tokenizer

# def tokenize(batch):
#     tokenizer = GPT2Tokenizer.from_pretrained("openai-community/gpt2")
#     tokenizer.add_special_tokens({"pad_token": tokenizer.eos_token})

#     x = np.array([batch['格式'],batch['诗歌']])
#     x = np.transpose(x)
#     x = [tokenizer.bos_token+r[0]+":"+r[1]+tokenizer.eos_token for r in x]
#     batch_tokens = tokenizer(
#         x,
#         truncation=True,  
#         max_length=128,
#         padding="max_length",  
#         return_tensors="np",
#     )
#     batch_tokens['labels'] = batch_tokens['input_ids'].copy()
#     return dict(batch_tokens)

# processed_datasets = {
#     key: (
#         ds.map_batches(tokenize, batch_format="pandas")
#     )
#     for key, ds in ray_datasets.items()
# }
# processed_datasets

# In[9]:

import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer, TrainingArguments, Trainer, default_data_collator
from transformers.utils.logging import disable_progress_bar, enable_progress_bar

from ray import train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback

import os

# def drop_last_iterable(ray_batch, batch_size):
#     for batch in ray_batch:
#         if batch.count() == batch_size:
#             yield batch

def train_fn(config):
    os.environ["OMP_NUM_THREADS"] = str(
        train.get_context().get_trial_resources().bundles[-1].get("CPU", 1)
    )
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)
    steps_per_epoch = config.get("steps_per_epoch")

    # print(batch_size,epochs,warmup_steps,learning_rate)
    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
            "hysteresis": 4,
            "consecutive_hysteresis": True,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        # "zero_optimization": {
        #     "stage": 3,
        #     "offload_optimizer": {
        #         "device": "cpu",
        #         "pin_memory": True,
        #     },
        #     "overlap_comm": True,
        #     "contiguous_gradients": True,
        #     "reduce_bucket_size": "auto",
        #     "stage3_prefetch_bucket_size": "auto",
        #     "stage3_param_persistence_threshold": "auto",
        #     "gather_16bit_weights_on_model_save": True,
        #     "round_robin_gradients": True,
        # },
        "zero_optimization": {
            "stage": 2,  
            "offload_optimizer": {  
                "device": "cpu", 
                "pin_memory": True, 
            },
            "overlap_comm": True, 
            "contiguous_gradients": True,  
            "reduce_bucket_size": "auto",  
            "gather_16bit_weights_on_model_save": True,  
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
        "checkpoint": {
            "checkpoint_path": "/opt/data/models/ray-ds-gpt2",  
            "save_interval": 00,  
            "save_limit": 3        
        },
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        logging_steps=1,
        save_strategy="steps",
        save_steps=steps_per_epoch,
        max_steps=steps_per_epoch * epochs,
        per_device_train_batch_size=batch_size,
        gradient_accumulation_steps=1,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        push_to_hub=False,
        report_to="none",
        disable_tqdm=False,  
        fp16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )

    disable_progress_bar()

    print("Loading model")

    model = GPT2LMHeadModel.from_pretrained(model_name).to("cuda")

    print("Model loaded")

    enable_progress_bar()

    print("Loading dataset")

    tokenizer = GPT2Tokenizer.from_pretrained("openai-community/gpt2")
    tokenizer.add_special_tokens({"pad_token": tokenizer.eos_token})

    def tokenize(batch, tokenizer = tokenizer):

        x = np.array([batch['格式'],batch['诗歌']])
        x = np.transpose(x)
        x = [tokenizer.bos_token+r[0]+":"+r[1]+tokenizer.eos_token for r in x]
        batch_tokens = tokenizer(
            x,
            truncation=True,  
            max_length=128,
            padding="max_length",  
            return_tensors="np",
        )
        batch_tokens['labels'] = batch_tokens['input_ids'].copy()
        return dict(batch_tokens)

    train_ds = train.get_dataset_shard("train")
    valid_ds = train.get_dataset_shard("validation")

    train_ds_iterable = train_ds.iter_torch_batches(
            batch_size=batch_size,
            collate_fn=tokenize,
            local_shuffle_buffer_size=train.get_context().get_world_size() * batch_size)
    valid_ds_iterable = valid_ds.iter_torch_batches(batch_size=batch_size,collate_fn=tokenize)

    print("Dataset loaded")

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_ds_iterable,
        eval_dataset=valid_ds_iterable,
        # tokenizer=tokenizer,
        data_collator=default_data_collator,
    )

    # for bt in train_ds_iterable:
    #     print(bt)
    #     break

    # Add callback to report checkpoints to Ray Train
    trainer.add_callback(RayTrainReportCallback())
    trainer = prepare_trainer(trainer)

    print("Start training")

    trainer.train()

    print("Training finished")

# In[10]:

storage_path = "/opt/data/models/ray-ds-gpt2"
batch_size_per_worker = 8
num_workers = 4 
gpus_per_worker = 1
cpus_per_worker = 2
train_ds_size = ray_datasets['train'].count()
steps_per_epoch = train_ds_size // (batch_size_per_worker * num_workers) 

# In[11]:

from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig

trainer = TorchTrainer(
    train_loop_per_worker=train_fn,
    train_loop_config={ # 对应 train_fn 中传入参数config
        "epochs": 1,
        "batch_size": batch_size_per_worker,  # per device
        "steps_per_epoch": steps_per_epoch,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=True,
        resources_per_worker={"GPU": gpus_per_worker, "CPU": cpus_per_worker},
    ),
    datasets=ray_datasets,
    run_config=RunConfig(storage_path=storage_path),
)

trainer.fit()

outputs

2024-08-20 11:13:36,175 INFO worker.py:1596 -- Connecting to existing Ray cluster at address: 10.0.7.246:6379...
2024-08-20 11:13:36,301 INFO worker.py:1772 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265 
2024-08-20 11:13:38,876 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-16_10-12-30_542481_63697/logs/ray-data
2024-08-20 11:13:38,876 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
✔️  Dataset execution finished in 4.64 seconds: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 115/115 [00:04<00:00, 24.97 bundle/s]
- ReadCSV->SplitBlocks(115): 0 active, 0 queued, [cpu: 0.0, objects: 1017.0KB]: : 115 bundle [00:04, 24.97 bundle/s]
2024-08-20 11:13:43,5841INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-08-16_10-12-30_542481_63697/logs/ray-data
2024-08-20 11:13:43,585 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
✔️  Dataset execution finished in 9.45 seconds: : 115 bundle [00:09, 12.19 bundle/s]             ]
- ReadCSV->SplitBlocks(115): 0 active, 0 queued, [cpu: 0.0, objects: 0.0B]: : 115 bundle [00:09, 12.19 bundle/s] 
- ReadCSV->SplitBlocks(115): 0 active, 0 queued, [cpu: 0.0, objects: 0.0B]: : 115 bundle [00:09, 18.21 bundle/s]
View detailed results here: /opt/data/models/ray-ds-gpt2/TorchTrainer_2024-08-20_11-13-58
To visualize your results with TensorBoard, run: `tensorboard --logdir /tmp/ray/session_2024-08-16_10-12-30_542481_63697/artifacts/2024-08-20_11-13-58/TorchTrainer_2024-08-20_11-13-58/driver_artifacts`
(TrainTrainable pid=3189010) No CUDA runtime is found, using CUDA_HOME='/usr/local/cuda'

Training started with configuration:
╭───────────────────────────────────────────╮
│ Training config                           │
├───────────────────────────────────────────┤
│ train_loop_config/batch_size            8 │
│ train_loop_config/epochs                1 │
│ train_loop_config/steps_per_epoch   12955 │
╰───────────────────────────────────────────╯
(RayTrainWorker pid=3189271) Setting up process group for: env:// [rank=0, world_size=4]
(RayTrainWorker pid=3189272) [W Utils.hpp:135] Warning: Environment variable NCCL_ASYNC_ERROR_HANDLING is deprecated; use TORCH_NCCL_ASYNC_ERROR_HANDLING instead (function getCvarInt)
(TorchTrainer pid=3189010) Started distributed worker processes: 
(TorchTrainer pid=3189010) - (node_id=a11e130fc7b1ba4eb19148b974927f2ff038709dc0434ea91f2268e8, ip=10.0.8.69, pid=3189271) world_rank=0, local_rank=0, node_rank=0
(TorchTrainer pid=3189010) - (node_id=a11e130fc7b1ba4eb19148b974927f2ff038709dc0434ea91f2268e8, ip=10.0.8.69, pid=3189272) world_rank=1, local_rank=1, node_rank=0
(TorchTrainer pid=3189010) - (node_id=195365cf9b8be4bd66cf01c38f307e303e73ab342898b499d548db93, ip=10.0.7.246, pid=2284536) world_rank=2, local_rank=0, node_rank=1
(TorchTrainer pid=3189010) - (node_id=195365cf9b8be4bd66cf01c38f307e303e73ab342898b499d548db93, ip=10.0.7.246, pid=2284537) world_rank=3, local_rank=1, node_rank=1
(RayTrainWorker pid=3189271) Preparing training arguments
(RayTrainWorker pid=3189271) [2024-08-20 11:14:17,895] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect)
(RayTrainWorker pid=3189271) [2024-08-20 11:14:18,308] [INFO] [comm.py:637:init_distributed] cdb=None
(RayTrainWorker pid=3189271) Loading model
(RayTrainWorker pid=2284537, ip=10.0.7.246) Model loaded
(RayTrainWorker pid=2284537, ip=10.0.7.246) Loading dataset
(RayTrainWorker pid=2284537, ip=10.0.7.246) Preparing training arguments [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)
(RayTrainWorker pid=2284537, ip=10.0.7.246) [2024-08-20 11:14:18,279] [INFO] [real_accelerator.py:203:get_accelerator] Setting ds_accelerator to cuda (auto detect) [repeated 3x across cluster]
(RayTrainWorker pid=2284537, ip=10.0.7.246) [2024-08-20 11:14:19,075] [INFO] [comm.py:637:init_distributed] cdb=None [repeated 3x across cluster]
(RayTrainWorker pid=2284537, ip=10.0.7.246) Loading model [repeated 3x across cluster]
(RayTrainWorker pid=2284536, ip=10.0.7.246) Dataset loaded
(RayTrainWorker pid=2284536, ip=10.0.7.246) Start training
(RayTrainWorker pid=3189271) Model loaded [repeated 3x across cluster]
(RayTrainWorker pid=3189271) Loading dataset [repeated 3x across cluster]
(RayTrainWorker pid=2284536, ip=10.0.7.246) max_steps is given, it will override any value given in num_train_epochs
(RayTrainWorker pid=2284536, ip=10.0.7.246) [W Utils.hpp:135] Warning: Environment variable NCCL_ASYNC_ERROR_HANDLING is deprecated; use TORCH_NCCL_ASYNC_ERROR_HANDLING instead (function getCvarInt) [repeated 3x across cluster]
(RayTrainWorker pid=2284536, ip=10.0.7.246) [rank2]:[W Utils.hpp:135] Warning: Environment variable NCCL_ASYNC_ERROR_HANDLING is deprecated; use TORCH_NCCL_ASYNC_ERROR_HANDLING instead (function getCvarInt)
(RayTrainWorker pid=3189271) Using /home/ubuntu/.cache/torch_extensions/py310_cu121 as PyTorch extensions root...
(RayTrainWorker pid=3189272) max_steps is given, it will override any value given in num_train_epochs [repeated 3x across cluster]
(RayTrainWorker pid=3189272) [rank1]:[W Utils.hpp:135] Warning: Environment variable NCCL_ASYNC_ERROR_HANDLING is deprecated; use TORCH_NCCL_ASYNC_ERROR_HANDLING instead (function getCvarInt) [repeated 3x across cluster]
(RayTrainWorker pid=2284537, ip=10.0.7.246) Emitting ninja build file /home/ubuntu/.cache/torch_extensions/py310_cu121/cpu_adam/build.ninja...
(RayTrainWorker pid=2284537, ip=10.0.7.246) Building extension module cpu_adam...
(RayTrainWorker pid=2284537, ip=10.0.7.246) Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)
(RayTrainWorker pid=2284537, ip=10.0.7.246) ninja: no work to do.
(RayTrainWorker pid=3189271) Dataset loaded [repeated 3x across cluster]
(RayTrainWorker pid=3189271) Start training [repeated 3x across cluster]
(RayTrainWorker pid=2284537, ip=10.0.7.246) Loading extension module cpu_adam...
(RayTrainWorker pid=2284537, ip=10.0.7.246) Time to load cpu_adam op: 3.08087420463562 seconds
(SplitCoordinator pid=3189703) StreamSplitDataIterator(epoch=-1, split=3) blocked waiting on other clients for more than 30s. All clients must read from the DataIterator splits at the same time. This warning will not be printed again for this epoch.
(RayTrainWorker pid=2284536, ip=10.0.7.246) Using /home/ubuntu/.cache/torch_extensions/py310_cu121 as PyTorch extensions root... [repeated 3x across cluster]
(RayTrainWorker pid=2284536, ip=10.0.7.246) Loading extension module cpu_adam...
(SplitCoordinator pid=3189703) StreamSplitDataIterator(epoch=-1, split=2) blocked waiting on other clients for more than 30s. All clients must read from the DataIterator splits at the same time. This warning will not be printed again for this epoch.