ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.54k stars 5.69k forks source link

TypeError in Ray TorchTrainer with StreamSplitDataIterator #47974

Open AliHaiderAhmad001 opened 1 week ago

AliHaiderAhmad001 commented 1 week ago

What happened + What you expected to happen

Description:

I encountered a TypeError when running TorchTrainer in a Ray Tune experiment. The error occurs due to an issue with StreamSplitDataIterator, which does not have a defined len() method. This issue causes the trial to fail and the training process to stop.

Error Traceback:

---------------------------------------------------------------------------
RayTaskError(TypeError)                   Traceback (most recent call last)
RayTaskError(TypeError): ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()

Expected Behavior:

I expected the training loop to handle the StreamSplitDataIterator properly without raising the TypeError.

Actual Behavior:

The TypeError prevents the training process from completing, as it attempts to calculate the length of an object that lacks a __len__ method.

Detailed Traceback

2024-10-10 16:42:48,349 DEBUG resource_updater.py:258 -- Checking Ray cluster resources.
2024-10-10 16:42:48,673 DEBUG tune_controller.py:1240 -- Future TRAIN FAILED for trial TorchTrainer_ccdf1_00000: ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()
2024-10-10 16:42:48,674 ERROR tune_controller.py:1331 -- Trial task failed for trial TorchTrainer_ccdf1_00000
Traceback (most recent call last):
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/worker.py", line 2691, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()
2024-10-10 16:42:48,843 DEBUG tune_controller.py:1367 -- Requesting to STOP actor for trial TorchTrainer_ccdf1_00000
2024-10-10 16:42:48,845 DEBUG tune_controller.py:735 -- Setting status for trial TorchTrainer_ccdf1_00000 from RUNNING to ERROR
2024-10-10 16:42:48,845 DEBUG tune_controller.py:1396 -- Terminating actor for trial TorchTrainer_ccdf1_00000: <TrackedActor 55643445138953405172927010354651748243>
2024-10-10 16:42:48,852 DEBUG experiment_state.py:122 -- Experiment state snapshotting took 0.00 seconds. Adjusting snapshotting period to 10.00 seconds.
2024-10-10 16:42:48,941 DEBUG experiment_state.py:122 -- Experiment state snapshotting took 0.08 seconds. Adjusting snapshotting period to 10.00 seconds.
2024-10-10 16:42:48,942 INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/ali/Desktop/projects/AIUpdateHub/efs/llm' in 0.0838s.
2024-10-10 16:42:48,949 DEBUG tune_controller.py:784 -- CLEANING UP all trials
2024-10-10 16:42:48,952 DEBUG tune_controller.py:800 -- Waiting for actor manager to clean up final state [dedup]
== Status ==
Current time: 2024-10-10 16:42:48 (running for 00:00:50.81)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/4 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                                                                                                     |
|--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TorchTrainer_ccdf1_00000 |            1 | /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts/TorchTrainer_ccdf1_00000_0_2024-10-10_16-41-58/error.txt |
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

== Status ==
Current time: 2024-10-10 16:42:48 (running for 00:00:50.90)
Using FIFO scheduling algorithm.
Logical resource usage: 2.0/4 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                                                                                                     |
|--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TorchTrainer_ccdf1_00000 |            1 | /tmp/ray/session_2024-10-10_16-36-38_318744_542780/artifacts/2024-10-10_16-41-57/llm/driver_artifacts/TorchTrainer_ccdf1_00000_0_2024-10-10_16-41-58/error.txt |
+--------------------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

2024-10-10 16:42:49,091 DEBUG tune_controller.py:1149 -- Actor STOPPED: <TrackedActor 55643445138953405172927010354651748243>
2024-10-10 16:42:49,094 DEBUG tune_controller.py:805 -- Force cleanup of remaining actors
2024-10-10 16:42:49,099 ERROR tune.py:1037 -- Trials did not complete: [TorchTrainer_ccdf1_00000]
2024-10-10 16:42:49,103 INFO tune.py:1041 -- Total run time: 51.81 seconds (50.81 seconds for the tuning loop).
---------------------------------------------------------------------------
RayTaskError(TypeError)                   Traceback (most recent call last)
RayTaskError(TypeError): ray::_Inner.train() (pid=545925, ip=127.0.1.1, actor_id=8df214bd0f57efa0f244450001000000, repr=TorchTrainer)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 331, in train
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 57, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::_RayTrainWorker__execute.get_next() (pid=546125, ip=127.0.1.1, actor_id=2c8fbaded7de91dfce4cb83501000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7099dcd57c70>)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 33, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ali/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 176, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker
TypeError: object of type 'StreamSplitDataIterator' has no len()

The above exception was the direct cause of the following exception:

TrainingFailedError                       Traceback (most recent call last)
File <timed exec>:2

File ~/Desktop/projects/AIUpdateHub/venv/lib/python3.10/site-packages/ray/train/base_trainer.py:638, in BaseTrainer.fit(self)
    634 result = result_grid[0]
    635 if result.error:
    636     # Raise trainable errors to the user with a message to restore
    637     # or configure FailureConfig in a new run.
--> 638     raise TrainingFailedError(
    639         "\n".join([restore_msg, TrainingFailedError._FAILURE_CONFIG_MSG])
    640     ) from result.error
    641 return result

TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: trainer = TorchTrainer.restore("/home/ali/Desktop/projects/AIUpdateHub/efs/llm").
To start a new run that will retry on training failures, set train.RunConfig(failure_config=train.FailureConfig(max_failures)) in the Trainer's run_config with max_failures > 0, or max_failures = -1 for unlimited retries

Versions / Dependencies

Environment:

Ray version: 2.37.0 Python version: 3.10 OS: Ubuntu Hardware: CPU-based training on local laptop

Reproduction script

Related codes

----------- Trainer -----------------

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_ds, "val": val_ds},
    dataset_config=dataset_config,
    metadata={"class_to_index": preprocessor.class_to_index}
)

------------------------ train_step -------------------------

def train_step(ds, batch_size, model, num_classes, loss_fn, optimizer):
    """Train step."""
    model.train()  # Set model to training mode
    cumulative_loss = 0.0  # Initialize cumulative loss
    #ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)  # Batch-wise generator
    ds_generator = ds.iter_batches(batch_size=batch_size, batch_format="torch", collate_fn=collate_fn)
    # Loop over batches
    for i, batch in enumerate(ds_generator):
        optimizer.zero_grad()  # Reset gradients before each batch
        z = model(batch)  # Forward pass

        # Ensure that targets are one-hot encoded properly
        targets = F.one_hot(batch["targets"], num_classes=num_classes).float()

        # Calculate loss
        loss = loss_fn(z, targets)

        # Backpropagation
        loss.backward()  # Backward pass
        optimizer.step()  # Update model weights

        # Calculate cumulative loss
        cumulative_loss += loss.detach().item()

    # Return the average loss over all batches
    return cumulative_loss / (i + 1)

----------------- train_loop_per_worker -------------------------

# Set up logging
logging.basicConfig(level=logging.INFO)  # You can set this to DEBUG for more detail
logger = logging.getLogger(__name__)

# Training loop
def train_loop_per_worker(config):
    # Hyperparameters
    dropout_p = config["dropout_p"]
    lr = config["lr"]
    lr_factor = config["lr_factor"]
    lr_patience = config["lr_patience"]
    num_epochs = config["num_epochs"]
    batch_size = config["batch_size"]
    num_classes = config["num_classes"]

    # Get datasets
    set_seeds()
    logger.info("Loading dataset shards...")
    train_ds = train.get_dataset_shard("train")
    val_ds = train.get_dataset_shard("val")
    logger.info(f"Dataset shards loaded. Training data size: {len(train_ds)}, Validation data size: {len(val_ds)}")

    # Model
    llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
    model = FinetunedLLM(llm=llm, dropout_p=dropout_p, embedding_dim=llm.config.hidden_size, num_classes=num_classes)
    model = train.torch.prepare_model(model)
    logger.info("Model initialized.")

    # Training components
    loss_fn = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)
    logger.info("Optimizer and scheduler initialized.")

    # Training
    num_workers = train.get_context().get_world_size()
    batch_size_per_worker = batch_size // num_workers
    logger.info(f"Training started with {num_workers} workers and batch size per worker: {batch_size_per_worker}")

    for epoch in range(num_epochs):
        logger.info(f"Epoch {epoch + 1}/{num_epochs}")
        train_loss = train_step(train_ds, batch_size_per_worker, model, num_classes, loss_fn, optimizer)
        logger.info(f"Training loss for epoch {epoch + 1}: {train_loss}")

        val_loss, _, _ = eval_step(val_ds, batch_size_per_worker, model, num_classes, loss_fn)
        logger.info(f"Validation loss for epoch {epoch + 1}: {val_loss}")
        scheduler.step(val_loss)

        # Checkpoint
        with tempfile.TemporaryDirectory() as dp:
            if isinstance(model, torch.nn.parallel.DistributedDataParallel):  # CPU case
                model.module.save(dp=dp)
            else:
                model.save(dp=dp)
            metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
            checkpoint = Checkpoint.from_directory(dp)
            train.report(metrics, checkpoint=checkpoint)
            logger.info(f"Epoch {epoch + 1} completed and checkpoint saved.")

--------- load_data ----------

def load_data(dataset_loc: str, num_samples: int = None) -> Dataset:
    """Load data from source into a Ray Dataset.

    Args:
        dataset_loc (str): Location of the dataset.
        num_samples (int, optional): The number of samples to load. Defaults to None.

    Returns:
        Dataset: Our dataset represented by a Ray Dataset.
    """
    ds = ray.data.read_csv(dataset_loc)
    ds = ds.random_shuffle(seed=1234)
    ds = ray.data.from_items(ds.take(num_samples)) if num_samples else ds
    return ds

Issue Severity

High: It blocks me from completing my task.

justinvyu commented 1 week ago

@AliHaiderAhmad001 The len call seems to be inside your own user defined function. Could you take a look here to see what it is?

  File "/tmp/ipykernel_542780/2278135304.py", line 29, in train_loop_per_worker