ray-project / xgboost_ray

Distributed XGBoost on Ray
Apache License 2.0
143 stars 34 forks source link

Error for Distributed Modin + XGBoost on Ray Client #73

Closed richardliaw closed 3 years ago

richardliaw commented 3 years ago

Hey @krfricke and @richardliaw, I'm running into issues when passing the dataframe using ray client and modin. This is what the code looks like -

import ray
import ray.util
ray.util.connect("<service_ip>:50051")

import ray.util.xgboost
from xgboost_ray import RayDMatrix, RayParams, train, RayFileType
import modin.pandas as pd

colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
df = pd.read_csv("s3://<>/HIGGS.csv", names=colnames)

dtrain = RayDMatrix(df, label="label")
config = {
    "tree_method": "hist",
    "eval_metric": ["logloss", "error"],
}

evals_result = {}

bst = train(
    params=config,
    dtrain=dtrain,
    evals_result=evals_result,
    ray_params=RayParams(max_actor_restarts=1, num_actors=8, cpus_per_actor=2),
    num_boost_round=100,
    evals=[(dtrain, "train")])
bst.save_model("higgs_huge.xgb")

It works when I execute this chunk as a remote function. The error is related to modin not being able to detect ray client runtime when called by ray-xgboost. Modin works fine otherwise when loading the csv itself.

This is how the stacktrace looks like -

---------------------------------------------------------------------------
RaySystemError                            Traceback (most recent call last)
<ipython-input-4-113fdd28eb1f> in <module>()
     20     ray_params=RayParams(max_actor_restarts=1, num_actors=8, cpus_per_actor=2),
     21     num_boost_round=100,
---> 22     evals=[(dtrain, "train")])
     23 bst.save_model("higgs_huge.xgb")

/usr/local/lib/python3.7/site-packages/xgboost_ray/main.py in train(params, dtrain, num_boost_round, evals, evals_result, additional_results, ray_params, _remote, *args, **kwargs)
   1073                 ray_params=ray_params,
   1074                 _remote=False,
-> 1075                 **kwargs,
   1076             ))
   1077         if isinstance(evals_result, dict):

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
    102         @wraps(function)
    103         def _remote_proxy(*args, **kwargs):
--> 104             return self._remote(args=args, kwargs=kwargs)
    105 
    106         self.remote = _remote_proxy

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, runtime_env, override_environment_variables, name)
    207                 runtime_env=runtime_env,
    208                 override_environment_variables=override_environment_variables,
--> 209                 name=name)
    210 
    211         worker = ray.worker.global_worker

/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs)
     70         setattr(func_cls, RAY_CLIENT_MODE_ATTR, key)
     71     client_func = ray._get_converted(key)
---> 72     return client_func._remote(in_args, in_kwargs, **kwargs)
     73 
     74 

/usr/local/lib/python3.7/site-packages/ray/util/client/common.py in _remote(self, args, kwargs, **option_args)
    104         if kwargs is None:
    105             kwargs = {}
--> 106         return self.options(**option_args).remote(*args, **kwargs)
    107 
    108     def __repr__(self):

/usr/local/lib/python3.7/site-packages/ray/util/client/common.py in remote(self, *args, **kwargs)
    281 
    282     def remote(self, *args, **kwargs):
--> 283         return return_refs(ray.call_remote(self, *args, **kwargs))
    284 
    285     def __getattr__(self, key):

/usr/local/lib/python3.7/site-packages/ray/util/client/api.py in call_remote(self, instance, *args, **kwargs)
     94             kwargs: opaque keyword arguments
     95         """
---> 96         return self.worker.call_remote(instance, *args, **kwargs)
     97 
     98     def call_release(self, id: bytes) -> None:

/usr/local/lib/python3.7/site-packages/ray/util/client/worker.py in call_remote(self, instance, *args, **kwargs)
    287         task = instance._prepare_client_task()
    288         for arg in args:
--> 289             pb_arg = convert_to_arg(arg, self._client_id)
    290             task.args.append(pb_arg)
    291         for k, v in kwargs.items():

/usr/local/lib/python3.7/site-packages/ray/util/client/client_pickler.py in convert_to_arg(val, client_id)
    174     out = ray_client_pb2.Arg()
    175     out.local = ray_client_pb2.Arg.Locality.INTERNED
--> 176     out.data = dumps_from_client(val, client_id)
    177     return out

/usr/local/lib/python3.7/site-packages/ray/util/client/client_pickler.py in dumps_from_client(obj, client_id, protocol)
    154         with io.BytesIO() as file:
    155             cp = ClientPickler(client_id, file, protocol=protocol)
--> 156             cp.dump(obj)
    157             return file.getvalue()
    158 

/usr/local/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    578     def dump(self, obj):
    579         try:
--> 580             return Pickler.dump(self, obj)
    581         except RuntimeError as e:
    582             if "recursion" in e.args[0]:

/usr/local/lib/python3.7/site-packages/modin/pandas/dataframe.py in __reduce__(self)
   2430 
   2431     def __reduce__(self):
-> 2432         self._query_compiler.finalize()
   2433         if PersistentPickle.get():
   2434             return self._inflate_full, (self._to_pandas(),)

/usr/local/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py in finalize(self)
    203 
    204     def finalize(self):
--> 205         self._modin_frame.finalize()
    206 
    207     def to_pandas(self):

/usr/local/lib/python3.7/site-packages/modin/engines/base/frame/data.py in finalize(self)
   2138         that were used to build it.
   2139         """
-> 2140         [part.drain_call_queue() for row in self._partitions for part in row]

/usr/local/lib/python3.7/site-packages/modin/engines/base/frame/data.py in <listcomp>(.0)
   2138         that were used to build it.
   2139         """
-> 2140         [part.drain_call_queue() for row in self._partitions for part in row]

/usr/local/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py in drain_call_queue(self)
     88             self._width_cache,
     89             self._ip_cache,
---> 90         ) = deploy_ray_func.remote(call_queue, oid)
     91         self.call_queue = []
     92 

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote_proxy(*args, **kwargs)
    102         @wraps(function)
    103         def _remote_proxy(*args, **kwargs):
--> 104             return self._remote(args=args, kwargs=kwargs)
    105 
    106         self.remote = _remote_proxy

/usr/local/lib/python3.7/site-packages/ray/remote_function.py in _remote(self, args, kwargs, num_returns, num_cpus, num_gpus, memory, object_store_memory, accelerator_type, resources, max_retries, placement_group, placement_group_bundle_index, placement_group_capture_child_tasks, runtime_env, override_environment_variables, name)
    210 
    211         worker = ray.worker.global_worker
--> 212         worker.check_connected()
    213 
    214         # If this function was not exported in this session and job, we need to

/usr/local/lib/python3.7/site-packages/ray/worker.py in check_connected(self)
    202         """
    203         if not self.connected:
--> 204             raise RaySystemError("Ray has not been started yet. You can "
    205                                  "start Ray with 'ray.init()'.")
    206 

RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.

_Originally posted by @Bhavya6187 in https://github.com/ray-project/xgboost_ray/issues/32#issuecomment-804430451_

Bhavya6187 commented 3 years ago

This issue might be independent of this library. Here's a smaller repro -

import ray
import ray.util
ray.util.connect("<service_ip>:50051")

import modin.pandas as pd

colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
df = pd.read_csv("s3://<>/HIGGS.csv", names=colnames)

@ray.remote
def add_rows(modin_df):
    for i, row in (df_input.iterrows()):
        modin_df.at[i,"sum"] = row['feature-01'] + row['feature-02'] + row['feature-03']
    return df_input    
df_2 = ray.get(add_rows.remote(df))

Should this be an issue on the main Ray Repo?

krfricke commented 3 years ago

Hm yeah, it seems to be related to Ray client and Modin, not necessarily to xgboost_ray (though I could imagine some probleme here, too). Can you open an issue at the main Ray repo and link to this one here? Let's keep this issue here open, too, just to track general compatibility with Modin.

Bhavya6187 commented 3 years ago

Sure, created https://github.com/ray-project/ray/issues/14857 to track it.