ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.23k stars 5.81k forks source link

[core] AttributeError: Can't get attribute 'CheckpointConfig' on <module 'opt.config.utils' from '/home/ray/anaconda3/lib/python3.9/site-packages/opt/config/utils.py'> #48246

Open wassimbensalem opened 1 month ago

wassimbensalem commented 1 month ago

What happened + What you expected to happen

I am doing simple training using ray, but somehow it cannot get CheckPointConfig attribut.

AttributeError Traceback (most recent call last) Cell In[14], line 1 ----> 1 model_ray = self._refit_with_ray(train_data_init, val_data_init) 2 # model_direct = self._refit(train_data_init, val_data_init)

File /var/www/optimization-library/opt/tuners/compredict_tuner.py:513, in CompredictTuner._refit_with_ray(self, train_data, val_data) 511 trainer._param_dict['scaling_config'] = trainer.scaling_config 512 print("trainer.scaling_config ", trainer.scaling_config) --> 513 trainer.fit() 515 # load best weights 516 model = self.trainable.build_model(config, self.input_shape, self.target_shape)

File /usr/local/lib/python3.9/site-packages/ray/train/base_trainer.py:583, in BaseTrainer.fit(self) 580 from ray.tune.tuner import Tuner, TunerInternal 581 from ray.tune import TuneError --> 583 trainable = self.as_trainable() 584 param_space = self._extract_fields_for_tuner_param_space() 586 if self._restore_path:

File /usr/local/lib/python3.9/site-packages/ray/train/base_trainer.py:837, in BaseTrainer.as_trainable(self) 834 trainable_cls = self._generate_trainable_cls() 836 # Wrap with tune.with_parameters to handle very large values in base_config --> 837 return tune.with_parameters(trainable_cls, **base_config)

File /usr/local/lib/python3.9/site-packages/ray/tune/trainable/util.py:293, in withparameters(trainable, **kwargs) 291 prefix = f"{str(trainable)}" 292 for k, v in kwargs.items(): --> 293 parameter_registry.put(prefix + k, v) 295 trainable_name = getattr(trainable, "name", "tune_with_parameters") 296 keys = set(kwargs.keys())

File /usr/local/lib/python3.9/site-packages/ray/tune/registry.py:296, in _ParameterRegistry.put(self, k, v) 294 self.to_flush[k] = v 295 if ray.is_initialized(): --> 296 self.flush()

File /usr/local/lib/python3.9/site-packages/ray/tune/registry.py:308, in _ParameterRegistry.flush(self) 306 self.references[k] = v 307 else: --> 308 self.references[k] = ray.put(v) 309 self.to_flush.clear()

File /usr/local/lib/python3.9/site-packages/ray/_private/auto_init_hook.py:24, in wrap_auto_init..auto_init_wrapper(*args, kwargs) 21 @wraps(fn) 22 def auto_init_wrapper(*args, *kwargs): 23 auto_init_ray() ---> 24 return fn(args, kwargs)

File /usr/local/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:102, in client_mode_hook..wrapper(*args, kwargs) 98 if client_mode_should_convert(): 99 # Legacy code 100 # we only convert init function if RAY_CLIENT_MODE=1 101 if func.name != "init" or is_client_mode_enabled_by_default: --> 102 return getattr(ray, func.name)(*args, *kwargs) 103 return func(args, kwargs)

File /usr/local/lib/python3.9/site-packages/ray/util/client/api.py:52, in _ClientAPI.put(self, *args, kwargs) 44 def put(self, *args, *kwargs): 45 """put is the hook stub passed on to replace ray.put 46 47 Args: (...) 50 kwargs: opaque keyword arguments 51 """ ---> 52 return self.worker.put(args, kwargs)

File /usr/local/lib/python3.9/site-packages/ray/util/client/worker.py:496, in Worker.put(self, val, client_ref_id, _owner) 488 raise TypeError( 489 "Calling 'put' on an ObjectRef is not allowed " 490 "(similarly, returning an ObjectRef from a remote " (...) 493 "call 'put' on it (or return it)." 494 ) 495 data = dumps_from_client(val, self._client_id) --> 496 return self._put_pickled(data, client_ref_id, _owner)

File /usr/local/lib/python3.9/site-packages/ray/util/client/worker.py:510, in Worker._put_pickled(self, data, client_ref_id, owner) 508 if not resp.valid: 509 try: --> 510 raise cloudpickle.loads(resp.error) 511 except (pickle.UnpicklingError, TypeError): 512 logger.exception("Failed to deserialize {}".format(resp.error))

AttributeError: Can't get attribute 'CheckpointConfig' on <module 'opt.config.utils' from '/home/ray/anaconda3/lib/python3.9/site-packages/opt/config/utils.py'>

Versions / Dependencies

Ray: 2.6.3 Python: 3.9.15

Reproduction script

model_ray = self._refit_with_ray(train_data_init, val_data_init)

With:

def _refit_with_ray(self, train_data, val_data):
        """Refit model with best hyperparameters using Ray.
        :param train_data: Tuple of lists of numpy arrays ([X,] [y]) for training.
        :param val_data: Tuple of lists of numpy arrays ([X], [y]) for validation.
        """
        def get_data(train_data, val_data, batch_size):
            train_dataset = self.trainable.generator(train_data, self.gen_params, shuffle=True)
            val_dataset = self.trainable.generator(val_data, self.gen_params, shuffle=False)
            self.train_data = train_dataset.batch(batch_size)
            self.val_data = val_dataset.batch(128)

        def train_func(config: dict):
            get_data(train_data, val_data, config.get("batch_size", 64))

            loss = keras.losses.get(self.loss_fn)
            if isinstance(self.optimizer, str):
                opt_config = dict(learning_rate=config["learning_rate"])
                opt_identifier = dict(class_name=self.optimizer, config=opt_config)
            else:
                opt_identifier = self.optimizer
                opt_identifier["config"]["learning_rate"] = config["learning_rate"]
            opt = keras.optimizers.get(opt_identifier)

            model = self.trainable.build_model(config, self.input_shape, self.target_shape)
            model.compile(loss=loss, optimizer=opt)

            # either use default callbacks
            if self.refit_callbacks is None:
                lr_schedule = keras.callbacks.ReduceLROnPlateau(
                    monitor="val_loss", factor=0.2, patience=4
                )
                early_stop = keras.callbacks.EarlyStopping(
                    "val_loss", patience=self.refit_early_stop_patience, verbose=1, mode="min"
                )
                self.callbacks = [lr_schedule, early_stop]
            # or use callbacks passed by user
            else:
                self.callbacks = self.refit_callbacks

            # model checkpointing is always included
            if not os.path.exists(os.path.join(self.checkpoint_config.path, self.id)):
                os.makedirs(os.path.join(self.checkpoint_config.path, self.id))
            checkpoint_filepath = os.path.join(
                self.checkpoint_config.path, self.id, "checkpoint.weights.h5"
            )
            model_checkpoint = keras.callbacks.ModelCheckpoint(
                filepath=checkpoint_filepath,
                save_weights_only=True,
                monitor="val_loss",
                mode="min",
                save_best_only=True,
            )
            self.callbacks.append(model_checkpoint)

            initial_epoch = 0
            # Define training checkpoint callback and try to restore from last checkpoint
            if self.checkpoint_config.enable:
                # Define final model checkpoint paths
                callback_state_path = os.path.join(
                    self.checkpoint_config.path, self.id, "callback_checkpoint"
                )
                training_state_path = os.path.join(
                    self.checkpoint_config.path, self.id, "training_checkpoint"
                )

                # Define model and optimizer checkpoint
                checkpoint = tf.train.Checkpoint(optimizer=model.optimizer, model=model)

                # Define callback to save training state
                training_checkpoint = TrainingCheckpoint(
                    model=model,
                    tf_checkpoint=checkpoint,
                    callbacks=self.callbacks,
                    epoch_interval=self.checkpoint_config.interval,
                    callback_state_path=callback_state_path,
                    training_state_path=training_state_path,
                )

                # Try to restore from last training state saved
                if os.path.exists(training_state_path + ".index"):
                    try:
                        initial_epoch = training_checkpoint.restore_state()
                        print(
                            f"Training restored from checkpoint in epoch {initial_epoch}."
                        )
                    except Exception as e:
                        print(f"Could not restore from checkpoint. Error message: {e}")
                        print("Continuing training from scratch...")

                # include training checkpoint callback
                self.callbacks.append(training_checkpoint)

            try:
                history = model.fit(
                    self.train_data,
                    validation_data=self.val_data,
                    epochs=self.refit_epochs_n,
                    callbacks=self.callbacks,
                    verbose=self.opt_verbose
                )
                results = history.history

            except KeyboardInterrupt:
                logger.debug("\n Model  refitting was interrupted by the user.\n Load best model checkpoit...")

            return results

        config = self._best_parameters

        nof_gpus_per_worker = int(os.environ.get("OPT_NOF_GPUS", 1))
        num_workers = int(os.environ.get("OPT_NOF_WORKERS", 1))
        use_gpu = nof_gpus_per_worker > 0

        print("Use gpu", use_gpu)
        resources_per_worker = dict(GPU=nof_gpus_per_worker) if nof_gpus_per_worker > 0 else {"CPU": 1}
        print('Config', config)
        trainer = TensorflowTrainer(
            train_loop_per_worker=train_func,
            train_loop_config=config,
            scaling_config=ScalingConfig(num_workers=1, use_gpu=True, resources_per_worker={'GPU': 1})
        )
        trainer.scaling_config.num_workers = num_workers
        trainer.scaling_config.use_gpu = use_gpu
        trainer.scaling_config.resources_per_worker = resources_per_worker
        trainer._param_dict['scaling_config'] = trainer.scaling_config
        print("trainer.scaling_config ", trainer.scaling_config)
        trainer.fit()

        # load best weights
        model = self.trainable.build_model(config, self.input_shape, self.target_shape)
        model.load_weights(os.path.join(self.checkpoint_config.path, self.id, "checkpoint.weights.h5"))

        return model

Issue Severity

High: It blocks me from completing my task.

justinvyu commented 3 days ago

Could you try upgrading to a newer version of Ray? There are many stability improvements in Ray Train since Ray 2.6.