ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.01k stars 5.45k forks source link

[AIR] various AIR usability issues around Checkpoints and predictions #28403

Closed maxpumperla closed 8 months ago

maxpumperla commented 1 year ago

What happened + What you expected to happen

I've created a gist that shows how to train a PyTorch model with AIR, then stores it to a checkpoint, and loads it again in a separate script. The script is based on an existing example.

The problem is that some aspects are not documented well or difficult to understand for (new) users.

Versions / Dependencies

ray 2.0.0

Reproduction script

For full repro script, see this gist, I'm going through a couple of specific points here:

  1. The tutorial uses convert_batch_to_pandas to convert tuples to dataframes. While it's in principle clear why you'd want to split features and labels like this pd.DataFrame({"image": images, "label": labels}), it's not very clearly explained why we do this (the train_loop_per_worker unpacks this later). Is this a pattern that should always be used? This is literally the only spot mentioning Pandas at all.

  2. Storing checkpoints is clear enough, and basically "everything" emits one. What's a little bothering is the loading part. The examples we give in our docs are often not sufficient to understand what's going on and the API references are simply not complete. E.g. BatchPredictor doesn't really tell you what to do:

Screenshot 2022-09-09 at 09 53 12

What's worse, the usage example given for TorchPredictor is simply wrong:

Screenshot 2022-09-09 at 09 54 39

What you actually have to do to load a Torch model is to provide a model definition alongside your checkpoint. But I think we're not properly telling users what model has to be. This is the correct call:

predictor = TorchPredictor.from_checkpoint(
    checkpoint=checkpoint,
    model=Net()
)

If you don't provide the model, this will complain about model_definition not being present:

ValueError: Attempting to load torch model from a state_dict, but no `model_definition` was provided.

The latter does not exist and it's unclear what exactly a state_dict is. This begs the question (again, without reading the source code for hours) what exactly a checkpoint is and what it entails.

More generally, it's very difficult to figure out what model has to be. There is a Keras and a PyTorch example on the docs, but no general usage pattern, as far as I can tell. For instance, on the getting started guide for AIR we promise that checkpoints for XGBoost can be loaded from a model:

Screenshot 2022-09-09 at 10 01 19

but we never show what exactly the model spec has to be, and I have not figured out how to do this without checking the source code.

  1. It's annoying that I have to bring the model definition everywhere. Why isn't that self-contained in the checkpoint? E.g. if I want to load a checkpoint on an entirely different machine I still have to provide the Net() definition here:
from ray.train.torch import TorchCheckpoint, TorchPredictor

CHECKPOINT_PATH = "checkpoint"
checkpoint = TorchCheckpoint.from_directory(CHECKPOINT_PATH)
predictor = TorchPredictor.from_checkpoint(
    checkpoint=checkpoint,
    model=Net()
)

which means that I have to import my model definition from somewhere or paste the code for it in that script, too. That's very inconvenient and not quite what I expect when serialising a model as a checkpoint. The same holds true for deployments, in which case I have to provide the model yet again. All examples we have seem to assume that you load the model in the same script that you stored it in, which I don't think is the standard scenario.

  1. Making sense of "prediction inputs" is difficult. To someone not using this regularly it's not obvious why we have to do this:
outputs: ray.data.Dataset = batch_predictor.predict(
    data=test_dataset,
    dtype=torch.float,
    feature_columns=["image"],
    keep_columns=["label"],
)

In particular, why do I have to provide the data type here? A bit later in the tutorial we map_batches to convert dataframes in the outputs. At the same time (and I was surprised by that and expected this to fail), you can also simply pass numpy arrays to the predictor like this:

data = np.array(image, dtype=np.float32)
array = data.reshape((1, 3, 32, 32))
output = np.argmax(batch_predictor.predict(array))

It feels a bit like black magic why both work. It's hard to reason about this interface unless you check the internals. What are the assumptions going into this and how can we communicate this clearly?

The same goes for PredictorDeployment which seems to want numpy, too. Would an http_adapter work that converted the JSON input to a Dataset of pandas dataframes first? Honestly, I don't know.

My gut feeling is that it would be easy to come up with a couple of multiple choice questions like "What does this interface expect?" or "does this code work?" that'd lead to surprising results.

Issue Severity

Medium: It is a significant difficulty but I can work around it.

heng2j commented 1 year ago

Thank you @maxpumperla for your detailed documentation. I am also looking for a better documentation for setting up the inference pipeline by using TorchPredictor after trained a model with Ray Train or Ray Tune

I bumped into another issue in Ray 2.1.0

I am using the similar code as @maxpumperla's to load the checkpoint from checkpoint path.

# This is a Ray Tune example
checkpoint_dir_path = "~/ray_results/TorchTrainer_2022-11-21_17-01-50/TorchTrainer_19f08_00000_0_2022-11-21_17-01-51/checkpoint_000009"

checkpoint = TorchCheckpoint.from_directory(checkpoint_dir_path)

predictor = TorchPredictor.from_checkpoint(checkpoint=checkpoint, model=src_model)

However, this time I am seeing the following error.

No item with key: model is found in the Checkpoint. Make sure this key exists when saving the checkpoint in ``TorchTrainer``.

I saved the model with Ray Tune and also with the following method with Ray Train:

# Saving checkpoint manually without Ray Tune
result = trainer.fit()
checkpoint_dir_path = "~/ray_results/Ray_trained"
result.checkpoint.to_directory(checkpoint_dir_path)

In both cases, there is no item with key model in the output checkpoint directories.

And from Checkpoint.to_directory() I see no option for saving out the model.

@richardliaw or other Ray team member can you take a look?

amogkam commented 1 year ago

Hey @heng2j, can you provide your full code?

In particular, how are you saving the checkpoint inside the train_loop_per_worker for your TorchTrainer?

heng2j commented 1 year ago

Hi @amogkam,

Yeah sure. I basically followed the code example from tune_cifar_torch_pbt_example.py


def train_func(config):

    source_model = model_loader.get_model(...)
    source_train_loader, target_validation_loader =  data_loaders.get_loaders(...)

    epochs = config.get("epochs", 3)
    lr = config["lr"]
    batch_size = config["batch_size"]

    worker_batch_size = batch_size // session.get_world_size()

    model = train.torch.prepare_model(source_model)

    starting_epoch = 0

    train_loader = train.torch.prepare_data_loader(source_train_loader)
    validation_loader = train.torch.prepare_data_loader(target_validation_loader)

    # Create loss.
    criterion = nn.CrossEntropyLoss()

    for epoch in range(starting_epoch, epochs):
        train_epoch(train_loader, model, criterion, src_opt, numeric_labeler)
        result = validate_epoch(validation_loader, model, criterion)

        checkpoint = None 

        if epoch % 10 == 0:

            checkpoint = Checkpoint.from_dict(
                {
                    "epoch": epoch,
                    "model_state_dict": model.state_dict(),
                    "optimizer_state_dict": src_opt.state_dict(),
                }
            )

        session.report(result, checkpoint=checkpoint)
bveeramani commented 1 year ago

Hey @heng2j,

Does it work if you do this?

def train_func(config):

    source_model = model_loader.get_model(...)
    source_train_loader, target_validation_loader =  data_loaders.get_loaders(...)

    epochs = config.get("epochs", 3)
    lr = config["lr"]
    batch_size = config["batch_size"]

    worker_batch_size = batch_size // session.get_world_size()

    model = train.torch.prepare_model(source_model)

    starting_epoch = 0

    train_loader = train.torch.prepare_data_loader(source_train_loader)
    validation_loader = train.torch.prepare_data_loader(target_validation_loader)

    # Create loss.
    criterion = nn.CrossEntropyLoss()

    for epoch in range(starting_epoch, epochs):
        train_epoch(train_loader, model, criterion, src_opt, numeric_labeler)
        result = validate_epoch(validation_loader, model, criterion)

        checkpoint = None 

        if epoch % 10 == 0:

            checkpoint = Checkpoint.from_dict(
                {
                    "epoch": epoch,
-                   "model_state_dict": model.state_dict(),
+                   "model": model.state_dict(),
                    "optimizer_state_dict": src_opt.state_dict(),
                }
            )

        session.report(result, checkpoint=checkpoint)
heng2j commented 1 year ago

Hey @bveeramani,

Thank you for your recommendation. By changing the key of the dict_checkpoint, it seems solved the issue that I raised earlier. However, when I was trying to load the checkpoint from the directory with TorchPredictor.from_checkpoint() like below:

checkpoint = Checkpoint.from_directory(checkpoint_dir_path)
predictor = TorchPredictor.from_checkpoint(checkpoint=checkpoint, model=source_model, use_gpu=True)

The code will call the load_state_dict() in the model. But the model failed to load the state_dict properly despite I provided the source_model (not the wrapped one with train.torch.prepare_model()) as reference.

class DenseNetDataParallel(nn.DataParallel):
    """DataParallel wrapper that exposes methods like extract_features."""
...
    def load_state_dict(self, state_dict, strict=True):
        """We want to save module state_dict, not wrapper's."""

        return self.module.load_state_dict(state_dict, strict=strict) # self.module is an instance of torch.nn.Module 

I am seeing the following errors. It seems like for some reason, Ray saved the state dict with different name spaces:

Exception has occurred: RuntimeError
Error(s) in loading state_dict for DenseNet:
    Missing key(s) in state_dict: "features.conv0.weight", "features.norm0.weight", "features.norm0.bias", 

...
    Unexpected key(s) in state_dict: "module.features.conv0.weight", "module.features.norm0.weight", "module.features.norm0.bias", "module.features.norm0.running_mean", "module.features.norm0.running_var", "module.features.norm0.num_batches_tracked", "module.features.denseblock1.denselayer1.norm1.weight", 

This is happening to model trained with and without Tune.

With Tune:

# With Tune
tuner = Tuner(
    trainer,
    tune_config=TuneConfig(
        num_samples=1, metric="loss", mode="min", 
    ),
    run_config=RunConfig(
        local_dir="~/output/expTestClassifier/ray_results/",
        stop={"training_iteration": 1 },
        failure_config=FailureConfig(max_failures=3),  # used for fault tolerance
    ),
)
results = tuner.fit()

Without Tune, I saved the model manually:


# Without Tune
result = trainer.fit()
print(f"Results: {result.metrics}")

checkpoint_dir_path = "~/output/expTestClassifier/ray_results/Ray_trained"
result.checkpoint.to_directory(checkpoint_dir_path)

Both are using the same trainer with the updated train_func

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={"lr": 1e-3, "batch_size": 8, "epochs": 1},
    scaling_config=ScalingConfig(
        num_workers=2, use_gpu=True
    ),
)
heng2j commented 1 year ago

Okay I think I may found the solution to save the checkpoint with different prefix in the documentation on Saving Checkpoints in the Deep Learning Guide's. I will give it a try over the weekend and post updates here.

TorchTrainer