ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.25k stars 5.63k forks source link

[Data] Cannot get the length of a tf dataset created from `ray_ds.to_tf` #33004

Open justinvyu opened 1 year ago

justinvyu commented 1 year ago

What happened + What you expected to happen

Converting a Ray dataset to a tf.data.Dataset through ray_ds.to_tf(...), then passing it into a Keras model.fit() runs into an error.

Traceback (most recent call last):
  File "/home/ray/workspace-project-justinvyu-dev/imageclf_raydata.py", line 172, in <module>
    result = trainer.fit()
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/base_trainer.py", line 579, in fit
    raise result.error
ray.exceptions.RayTaskError(TypeError): ray::_Inner.train() (pid=14334, ip=10.0.57.102, repr=TensorflowTrainer)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/tune/trainable/trainable.py", line 384, in train
    raise skipped from exception_cause(skipped)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(TypeError): ray::RayTrainWorker._RayTrainWorker__execute() (pid=14452, ip=10.0.57.102, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7eff24d08a90>)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/worker_group.py", line 31, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/train/_internal/utils.py", line 129, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/home/ray/workspace-project-justinvyu-dev/imageclf_raydata.py", line 121, in train_func
    f"# training batches per worker = {len(train_ds)} "
  File "/home/ray/anaconda3/lib/python3.9/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 531, in __len__
    raise TypeError("The dataset length is unknown.")
TypeError: The dataset length is unknown.

Workaround

train_ray_ds = session.get_dataset_shard("train")
train_ds = train_ray_ds.to_tf(
    feature_columns=["image"],
    label_columns=["label"],
    batch_size=batch_size_per_worker,
    local_shuffle_buffer_size=256,
)
# Workaround: set the TF dataset cardinality manually
train_ds = train_ds.apply(tf.data.experimental.assert_cardinality(train_ray_ds.count()))

Versions / Dependencies

2.3.0

Reproduction script

import ray
import tensorflow as tf
from typing import Dict

IMG_SIZE = 224
NUM_CLASSES = 10

import numpy as np
from tensorflow.keras import layers
from tensorflow.keras.applications import EfficientNetB0

from ray.data.datasource.partitioning import Partitioning

ray.data.context.DatasetContext.get_current().use_streaming_executor = True

def get_dataset_for_split(split: str):
    data_folder = f"s3://anonymous@air-example-data/food-101-tiny/{split}"
    partitioning = Partitioning(
        "dir", field_names=["class"], base_dir=data_folder
    )

    def resize(batch: Dict[str, np.ndarray]):
        batch["image"] = tf.convert_to_tensor(batch["image"], dtype=tf.uint8)
        batch["image"] = tf.image.resize(batch["image"], (IMG_SIZE, IMG_SIZE)).numpy()
        return batch

    return ray.data.read_images(
        data_folder, size=(512, 512), partitioning=partitioning, mode="RGB"
    ).map_batches(resize, batch_format="numpy").random_shuffle()

train_ds, valid_ds = [get_dataset_for_split(split) for split in ("train", "valid")]

labels = valid_ds.groupby("class").count().to_pandas()
class_to_idx = {
    class_name: i
    for i, class_name in enumerate(labels["class"])
}

TRAIN_DS_LENGTH = int(train_ds.count())
VALID_DS_LENGTH = int(valid_ds.count())
NUM_WORKERS = 1

from ray.data.preprocessors import BatchMapper

def build_preprocessor():
    # 1. Map the image folder names to label ids
    def map_labels(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        batch["label"] = np.vectorize(class_to_idx.get)(batch["class"])
        return {"image": batch["image"], "label": batch["label"]}

    label_preprocessor = BatchMapper(map_labels, batch_format="numpy")
    return label_preprocessor

def build_model():
    inputs = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    # x = img_augmentation(inputs)
    x = inputs
    model = EfficientNetB0(include_top=False, input_tensor=x, weights="imagenet")

    # Freeze the pretrained weights
    model.trainable = True

    # Rebuild top
    x = layers.GlobalAveragePooling2D(name="avg_pool")(model.output)
    x = layers.BatchNormalization()(x)

    top_dropout_rate = 0.2
    x = layers.Dropout(top_dropout_rate, name="top_dropout")(x)
    outputs = layers.Dense(NUM_CLASSES, activation="linear", name="pred")(x)

    # Compile
    model = tf.keras.Model(inputs, outputs, name="EfficientNet")
    return model

from ray.air import session

def train_func(config: dict):
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():  
        model = build_model()
        optimizer = tf.keras.optimizers.Adam()
        loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        model.compile(
            optimizer=optimizer,
            loss=loss_object,
            metrics=["accuracy"],
        )

    train_ray_ds = session.get_dataset_shard("train")
    train_ds = train_ray_ds.to_tf(
        feature_columns=["image"],
        label_columns=["label"],
        batch_size=64,
        local_shuffle_buffer_size=256,
    )
    # Uncomment this for it to work:
    # train_ds = train_ds.apply(tf.data.experimental.assert_cardinality(train_ray_ds.count()))
    train_ds = train_ds.map(lambda image, label: (tf.image.resize(image["image"], (IMG_SIZE, IMG_SIZE)), label["label"]))

    model.fit(train_ds, epochs=10)

from ray import air
from ray.train.tensorflow import TensorflowTrainer

use_gpu = True

trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    datasets={"train": train_ds},
    preprocessor=build_preprocessor(),
    scaling_config=air.ScalingConfig(
        num_workers=1,
        use_gpu=False,
        trainer_resources={"CPU": 0},
        resources_per_worker={"CPU": 1.0, "GPU": 0},
    ),
)
result = trainer.fit()

Issue Severity

Medium: It is a significant difficulty but I can work around it.

justinvyu commented 1 year ago

Actually, this was a result of me calling len(ray_ds.to_tf(...)), not model.fit. Maybe we could set the cardinality of the tf dataset to the ray_ds.count() in to_tf?

amogkam commented 1 year ago

@justinvyu this is expected since the tensorflow dataset is created from a generator. Calculating the length would require iterating through the entire dataset. See https://discuss.tensorflow.org/t/typeerror-dataset-length-is-unknown-tensorflow/948/2 for more info.

It is recommended to use dataset.count() beforehand which can return the count from metadata, without executing it.

amogkam commented 1 year ago

What is the use case?