Closed daviddwlee84 closed 9 months ago
ray.train.xgboost.xgboost_trainer — Ray 2.8.0
The filename is "model.json"
But somehow the dump checkpoints are still binary
Quick workaround now
import xgboost
# Replace all XGBoostTrainer with MyXGBoostTrainer
class MyXGBoostTrainer(XGBoostTrainer):
@staticmethod
def get_model(checkpoint: Checkpoint) -> xgboost.Booster:
"""Retrieve the XGBoost model stored in this checkpoint."""
with checkpoint.as_directory() as checkpoint_path:
booster = xgboost.Booster()
booster.load_model(
os.path.join(checkpoint_path, 'model')
)
return booster
def _save_model(self, model: xgboost.Booster, path: str) -> None:
model.save_model(os.path.join(path, 'model'))
(Additional question, not very related to the fail-to-load issue)
Is it possible to continue the same checkpoint directory and checkpoint index when using resume_from_checkpoint
For example, if I load from Checkpoint(filesystem=local, path=/mnt/NAS/sda/ShareFolder/lidawei/ExperimentNotebook/ray_debug/XGBoost_ResumeExperiment/XGBoostTrainer_96983_00000_0_2023-12-05_13-53-18/checkpoint_000017)
I am expecting if I create another XGBoostTrainer with resume_from_checkpoint
pointing to this checkpoint and set num_boost_round=20
, it will continue training another 20 rounds and get checkpoint like checkpoint_000037
.
But instead, it will create another folder under /mnt/NAS/sda/ShareFolder/lidawei/ExperimentNotebook/ray_debug/XGBoost_ResumeExperiment/
. And I will have to increase num_boost_round
to like 40 (> 20) to continue the training. And the checkpoint index will start over from 0.
This would be inconvenient if I want to monitor this in TensorBoard which I expected it was the "same experiment".
I think this experience is a little bit weird.
Since if I want to restore from a failure I would use ray.train.xgboost.XGBoostTrainer.restore
and do something like
# NOTE: the datasets are the same here. But when using `resume_from_checkpoint` I would use incremental data.
trainer_restore = MyXGBoostTrainer.restore(os.path.dirname(os.path.dirname(checkpoint.path)), datasets={"train": train_dataset, "valid": valid_dataset})
result_restore = trainer_restore.fit()
Found printing the path
in the XGBoostTrainer._save_model
shows dynamically generated temporary directory path like /tmp/tmppbsxfulk
. (Seems this issue belongs to Ray 2.8+)
Haven't found where Ray actually dumped the model itself
Maybe controlled by the StorageContext. Not sure.
ray.train.report(..., checkpoint=checkpoint)
to dump checkpoint to "persistent storage".model.json
(The Trainer._save_model()
not properly working) but got model
insteadSyncConfig(sync_artifacts=True)
to RunConfig
still not workingimport ray
from ray.train import SyncConfig, RunConfig, CheckpointConfig, FailureConfig, ScalingConfig, Checkpoint
from ray.train.xgboost import XGBoostTrainer
import xgboost as xgb
import os
class MyXGBoostTrainer(XGBoostTrainer):
"""
Workaround
https://github.com/ray-project/ray/issues/41608
https://github.com/dmlc/xgboost/issues/3089
"""
@staticmethod
def get_model(checkpoint: Checkpoint) -> xgb.Booster:
"""Retrieve the XGBoost model stored in this checkpoint."""
with checkpoint.as_directory() as checkpoint_path:
booster = xgb.Booster()
booster.load_model(
os.path.join(checkpoint_path, 'model')
)
if booster.attr('feature_names') is not None:
booster.feature_names = booster.attr(
'feature_names').split('|')
return booster
def _save_model(self, model: xgb.Booster, path: str) -> None:
"""
BUG: somehow we are not saving to the correct place we want
https://github.com/ray-project/ray/issues/41608
Path direct to the temp file
/tmp/tmppbsxfulk
"""
if hasattr(model, 'feature_names'):
model.set_attr(feature_names='|'.join(model.feature_names))
model.save_model(os.path.join(path, 'model'))
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
sync_config = SyncConfig(sync_artifacts=True)
run_config = RunConfig(
name=f"XGBoost_Test_Checkpoint_Save_Load",
storage_path="/NAS/ShareFolder/ray_debug",
checkpoint_config=CheckpointConfig(
checkpoint_frequency=1,
num_to_keep=10,
checkpoint_at_end=True,
checkpoint_score_attribute='train-error',
checkpoint_score_order='min',
),
failure_config=FailureConfig(max_failures=2),
sync_config=sync_config,
)
scaling_config = ScalingConfig(
num_workers=3,
placement_strategy="SPREAD",
use_gpu=False,
)
trainer = XGBoostTrainer(
scaling_config=scaling_config,
run_config=run_config,
label_column="target",
num_boost_round=20,
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
)
result = trainer.fit()
checkpoint = result.get_best_checkpoint('valid-logloss', 'min')
# Failed using XGBoostTrainer
booster = XGBoostTrainer.get_model(checkpoint)
# This will work
booster = MyXGBoostTrainer.get_model(checkpoint)
# But feature_name is None
print(booster.feature_names)
If modify Trainer to store the entire booster as a pickle, I will still not be able to find the model.pickle
in the directory.
class XGBoostTrainerWithPickle(XGBoostTrainer):
@staticmethod
def get_model(checkpoint: Checkpoint) -> xgb.Booster:
with checkpoint.as_directory() as checkpoint_path:
with open(os.path.join(checkpoint_path, 'model.pickle'), 'rb') as fp:
booster = pickle.load(fp)
return booster
def _save_model(self, model: xgb.Booster, path: str) -> None:
with open(os.path.join(path, 'model.pickle'), 'wb') as fp:
pickle.dump(model, fp)
Confirmed it more like the ray.train.report
issue. Not XGBoostTrainer.
class MyXGBoostTrainer(XGBoostTrainer):
@staticmethod
def get_model(checkpoint: Checkpoint) -> xgb.Booster:
with checkpoint.as_directory() as checkpoint_path:
booster = xgb.Booster()
booster.load_model(
os.path.join(checkpoint_path, 'model.json')
)
if booster.attr('feature_names') is not None:
booster.feature_names = booster.attr(
'feature_names').split('|')
return booster
def _save_model(self, model: xgb.Booster, path: str) -> None:
if hasattr(model, 'feature_names'):
model.set_attr(feature_names='|'.join(model.feature_names))
model.save_model(os.path.join(path, 'model.json'))
# Can found ['model.json'] in the temp dir
print(os.listdir(path))
print(ckpt := Checkpoint.from_directory(path))
# Successfully load XGBoost booster and print feature names
print(MyXGBoostTrainer.get_model(ckpt).feature_names)
@daviddwlee84 Thanks for posting all of your investigation, I'll take a closer look today.
@daviddwlee84 I am not able to reproduce this with all the same package versions as you -- the biggest thing I can think of is xgboost_ray
being out of date, and saving to the model
file instead of the updated model.json
. Could you double-check your xgboost_ray
version and upgrade it to latest?
Sure, I will take a look today to see if it is xgboost_ray
related issue. (Version is the latest already)
# https://github.com/ray-project/xgboost_ray/releases/tag/v0.1.19
$ pip list | grep xgboost-ray
xgboost-ray 0.1.19
But the weird thing is that, no matter how I modify XGBoostTrainer (get_model()
and especially _save_model()
method), the content in the destination stays unchanged. (So, modifying get_model()
to load what it gives me is the only workaround now)
I tried:
model.json
model.json
to model
feature_name
) in the booster attributesConclusion
RunConfig
will always and can only find the legacy model
file without all operations I did in the _save_model()
I haven't found how ray.train.report
save the Checkpoint to the destination. I expected there should be a copy operation (should have the same content as what's in the temp dir) or reinvoke of the _save_model()
(which forces the booster checkpoint to be the legacy model
output name).
Maybe it is the _TrainSession.persist_artifacts
's work, but haven't found out how it works.
@justinvyu Did your checkpoint content change when you modify XGBoostTrainer._save_model()
?
Maybe another clue is where the warning message was generated
WARNING: /workspace/src/c_api/c_api.cc:1240: Saving into deprecated binary model format, please consider using `json` or `ubj`. Model format will default to JSON in XGBoost 2.2 if not specified.
I can do the experiment in get_model()
class MyXGBoostTrainer(XGBoostTrainer):
@staticmethod
def get_model(checkpoint: Checkpoint) -> xgb.Booster:
with checkpoint.as_directory() as checkpoint_path:
booster = xgb.Booster()
booster.load_model(
os.path.join(checkpoint_path, 'model.json')
)
# This will dump the user warning
# WARNING: /workspace/src/c_api/c_api.cc:1240: Saving into deprecated binary model format, please consider using `json` or `ubj`. Model format will default to JSON in XGBoost 2.2 if not specified.
booster.save_model('model')
# This works fine
booster.save_model('model.json')
It shows that as long as I didn't call booster.save_model()
without giving *.json
postfix in _save_model()
, this warning shouldn't exist unless there is somewhere override my code.
Somehow Ray didn't use what I set in the _save_model()
, and I think xgboost_ray
only involves the worker training part.
Because the TensorBoard-like checkpoint directory structure is generated by Ray Tuner API, xgboost_ray
only returns booster and we will have to save it manually.
@daviddwlee84
I haven't found how ray.train.report save the Checkpoint to the destination.
This is where the checkpoint gets persisted -- it does get copied from the temp dir to the location on persistent storage (NFS/S3). It happens during the ray.train.report
call:
Did your checkpoint content change when you modify XGBoostTrainer._save_model()?
I tried this:
class MyXGBoostTrainer(XGBoostTrainer):
def _save_model(self, model, path: str):
model.save_model(os.path.join(path, "model.ubj"))
This works fine for me:
$ ls /home/ray/ray_results/XGBoost_ResumeExperiment/MyXGBoostTrainer_da263_00000_0_2023-12-15_13-35-25/checkpoint_000020
model.ubj
Q: What's your cluster setup? Are you running on multiple nodes, and is the xgboost
/xgboost_ray
/ray
version the same on every node?
Q: What's your cluster setup? Are you running on multiple nodes, and is the
xgboost
/xgboost_ray
/ray
version the same on every node?
I have three machines. I set up the workspace (/mnt/NAS/ShareFolder/MyRepo
) in a NAS directory which are accessible for these three machine and have mounted under the same directory structure.
In the workspace, I created a Python 3.8.13 virtual environment (/mnt/NAS/ShareFolder/MyRepo/MyVenv
), which installed ray==2.8.1
, xgboost-ray==0.1.19
, xgboost==2.0.2
.
And I start the cluster like this
# launch_ray_head_node.sh
RAY_record_ref_creation_sites=1 RAY_PROMETHEUS_HOST=http://192.168.222.235:9000 RAY_GRAFANA_HOST=http://192.168.222.235:3000 RAY_scheduler_spread_threshold=0.0 /mnt/NAS/ShareFolder/MyRepo/MyVenv/bin/ray start --head --node-ip-address 192.168.222.235 --port 6379 --dashboard-host 0.0.0.0 --dashboard-port 8265 --object-store-memory 450000000000
# launch_ray_worker_node.sh
RAY_record_ref_creation_sites=1 RAY_scheduler_spread_threshold=0.0 /mnt/NAS/ShareFolder/MyRepo/MyVenv/bin/ray --address 192.168.222.235:6379 --object-store-memory 450000000000
/mnt/NAS/ShareFolder/MyRepo/MyVenv/bin/python -m trainer.ray_training
In this script, I have a RunConfig like this, which directs the checkpoint to the NAS share folder
run_config = RunConfig(
name="ExperimentName",
storage_path="/mnt/NAS/ShareFolder/MyRepo/Results",
...
)
If the Ray version is inconsistent, it will raise an error at the cluster starting phase, but I am not sure if it will warn for other packages.
I tried to print the package version by doing this
import ray
import logging
ray.init()
@ray.remote(scheduling_strategy='SPREAD')
class Actor:
def __init__(self):
logging.basicConfig(level=logging.INFO)
def log(self):
logger = logging.getLogger(__name__)
import xgboost
import xgboost_ray
logger.info({
'xgboost': xgboost.__version__,
'xgboost_ray': xgboost_ray.__version__,
'ray': ray.__version__,
})
for _ in range(3):
actor = Actor.remote()
ray.get(actor.log.remote())
And get the following logs
/mnt/NAS/ShareFolder/MyRepo/MyVenv/bin/python /mnt/NAS/ShareFolder/MyRepo/ray_environment_check.py
2023-12-18 10:19:37,829 INFO worker.py:1489 -- Connecting to existing Ray cluster at address: 192.168.222.235:6379...
2023-12-18 10:19:37,858 INFO worker.py:1664 -- Connected to Ray cluster. View the dashboard at http://192.168.222.235:8265
(Actor pid=38713, ip=192.168.222.236) INFO:__main__:{'xgboost': '2.0.2', 'xgboost_ray': '0.1.19', 'ray': '2.8.1'}
(Actor pid=35015, ip=192.168.222.237) INFO:__main__:{'xgboost': '2.0.2', 'xgboost_ray': '0.1.19', 'ray': '2.8.1'}
(Actor pid=38897) INFO:__main__:{'xgboost': '2.0.2', 'xgboost_ray': '0.1.19', 'ray': '2.8.1'}
Not sure if this confirms they are using the same packages.
I found only the latest iteration checkpoint is correctly called _save_model()
.
Other iteration dumps are not calling the _save_model()
.
Seems if GBDTTrainer._checkpoint_at_end()
means the very end, then it works as expected.
Okay, I found the exact issue! The "non-end" checkpoint is preserved by _tune_callback_checkpoint_cls
And the filename "model" is been set here.
Now I am building a workaround for this. If it is successful, I will post the solution here.
But this is kind of tricky and non-intuitive that not all models were saved by the trainer's _save_model()
method.
@justinvyu I think this issue can be reproduced by setting CheckpointConfig for non-zero checkpoint_frequency
.
checkpoint_config=CheckpointConfig(
checkpoint_frequency=1,
num_to_keep=10,
checkpoint_at_end=True,
),
Using self-defined checkpoint callback
from typing import Optional
import ray
from ray.train import SyncConfig, RunConfig, CheckpointConfig, FailureConfig, ScalingConfig, Checkpoint
from ray.train.xgboost import XGBoostTrainer
from ray.tune.integration.xgboost import TuneReportCheckpointCallback
from contextlib import contextmanager
import tempfile
import xgboost as xgb
import os
class MyXGBoostCheckpointCallback(TuneReportCheckpointCallback):
@contextmanager
def _get_checkpoint(
self, model: xgb.Booster, epoch: int, filename: str, frequency: int
) -> Optional[Checkpoint]:
if not frequency or epoch % frequency > 0 or (not epoch and frequency > 1):
# Skip 0th checkpoint if frequency > 1
yield None
return
with tempfile.TemporaryDirectory() as checkpoint_dir:
if hasattr(model, 'feature_names'):
model.set_attr(feature_names='|'.join(model.feature_names))
model.save_model(os.path.join(checkpoint_dir, filename))
checkpoint = Checkpoint.from_directory(checkpoint_dir)
yield checkpoint
class MyXGBoostTrainer(XGBoostTrainer):
# HERE
# This is a must-have, even though we have set the callback in the trainer's callback argument.
# In GBDTTrainer training_loop, it will check the object, if not the same it will create another one,
# And you will dump double checkpoints unconsciously
_tune_callback_checkpoint_cls = MyXGBoostCheckpointCallback
@staticmethod
def get_model(checkpoint: Checkpoint) -> xgb.Booster:
"""Retrieve the XGBoost model stored in this checkpoint."""
with checkpoint.as_directory() as checkpoint_path:
booster = xgb.Booster()
booster.load_model(
os.path.join(checkpoint_path, 'model.json')
)
if booster.attr('feature_names') is not None:
booster.feature_names = booster.attr(
'feature_names').split('|')
return booster
def _save_model(self, model: xgb.Booster, path: str) -> None:
if hasattr(model, 'feature_names'):
model.set_attr(feature_names='|'.join(model.feature_names))
model.save_model(os.path.join(path, 'model.json'))
dataset = ray.data.read_csv(
"s3://anonymous@air-example-data/breast_cancer.csv")
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
sync_config = SyncConfig(sync_artifacts=True)
run_config = RunConfig(
name=f"XGBoost_Test_Checkpoint_Save_Load",
storage_path="/NAS/ShareFolder/ray_debug",
checkpoint_config=CheckpointConfig(
checkpoint_frequency=1,
num_to_keep=10,
checkpoint_at_end=True,
checkpoint_score_attribute='train-error',
checkpoint_score_order='min',
),
failure_config=FailureConfig(max_failures=2),
sync_config=sync_config,
)
scaling_config = ScalingConfig(
num_workers=3,
placement_strategy="SPREAD",
use_gpu=False,
)
trainer = MyXGBoostTrainer(
scaling_config=scaling_config,
run_config=run_config,
label_column="target",
num_boost_round=30,
params={
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# HERE
callbacks=[MyXGBoostCheckpointCallback(
filename="model.json", frequency=1)],
)
result = trainer.fit()
print(checkpoint := result.get_best_checkpoint('valid-logloss', 'min'))
booster = MyXGBoostTrainer.get_model(checkpoint)
print(booster.num_boosted_rounds())
print(booster.feature_names)
Thank you for the investigation! The checkpoint_at_end
and checkpoint_frequency
do indeed go through different codepaths, and I was able to reproduce with checkpoint_frequency=1
. I'll put up a fix PR to clean this up!
What happened + What you expected to happen
When I finish XGBoost training using XGBoostTrainer I want to continue training on the best checkpoint
resume_from_checkpoint
failed to load the checkpointXGBoostTrainer.get_model
can't get the checkpoint either.The first issue error message happens when creating a new trainer with
resume_from_checkpoint
and is quite similar to this https://github.com/ray-project/ray/issues/16375This error message will be like the second one when I remove the early stop config
stop=ExperimentPlateauStopper('train-error', mode='min')
inRunConfig
And the second issue might be relevant to this https://github.com/ray-project/ray/issues/41374 Either Ray saves the XGBoost model to legacy binary or cannot load the non-default model name from the checkpoint. The workaround seems not working.
Where there are warning logs like this
And if use
XGBoostTrainer.get_model(checkpoint)
will get errorVersions / Dependencies
Python 3.8.13
Packages
OS
Reproduction script
The reproduction script is based on the official tutorial Get Started with XGBoost and LightGBM — Ray 2.8.0
Load data and do the first training
During fitting will get warnings like this
Get the Best Checkpoint and Resume
This will get an error like this when enabling early stopping
And error like this without an early stopping
Which will be the same as
Issue Severity
High: It blocks me from completing my task.