dask / dask-xgboost

BSD 3-Clause "New" or "Revised" License
162 stars 43 forks source link

[BUG] testcase failure - TypeError: can not initialize DMatrix from COO #51

Closed ksangeek closed 4 years ago

ksangeek commented 5 years ago

Describe the bug pytest for test_sparse() fails with -

E   TypeError: can not initialize DMatrix from COO

Steps/Code to reproduce bug This can be easily reproduced with xgboost 0.82 and 0.90.

============================= test session starts ============================== platform linux -- Python 3.6.9, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 rootdir: /home/sangeek/examples/dask-xgb-examples/tests plugins: xdist-1.28.0, forked-1.0.2, cov-2.7.1 collected 1 item test_sparse.py F [100%] =================================== FAILURES =================================== _________________________________ test_sparse __________________________________ def test_func(): result = None workers = [] with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: async def coro(): with dask.config.set(config): s = False for i in range(5): try: s, ws = await start_cluster( nthreads, scheduler, loop, security=security, Worker=Worker, scheduler_kwargs=scheduler_kwargs, worker_kwargs=worker_kwargs, ) except Exception as e: logger.error( "Failed to start gen_cluster, retrying", exc_info=True, ) else: workers[:] = ws args = [s] + workers break if s is False: raise Exception("Could not start cluster") if client: c = await Client( s.address, loop=loop, security=security, asynchronous=True, **client_kwargs ) args = [c] + args try: future = func(*args) if timeout: future = gen.with_timeout( timedelta(seconds=timeout), future ) result = await future if s.validate: s.validate_state() finally: if client and c.status not in ("closing", "closed"): await c._close(fast=s.status == "closed") await end_cluster(s, workers) await gen.with_timeout( timedelta(seconds=1), cleanup_global_workers() ) try: c = await default_client() except ValueError: pass else: await c._close(fast=True) for i in range(5): if all(c.closed() for c in Comm._instances): break else: await gen.sleep(0.05) else: L = [c for c in Comm._instances if not c.closed()] Comm._instances.clear() # raise ValueError("Unclosed Comms", L) print("Unclosed Comms", L) return result result = loop.run_sync( > coro, timeout=timeout * 2 if timeout else timeout ) /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/distributed/utils_test.py:947: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/tornado/ioloop.py:532: in run_sync return future_cell[0].result() /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/distributed/utils_test.py:915: in coro result = await future /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/tornado/gen.py:742: in run yielded = self.gen.throw(*exc_info) # type: ignore test_sparse.py:42: in test_sparse dbst = yield dxgb.train(c, param, dX, dy) /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/tornado/gen.py:735: in run value = future.result() /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/tornado/gen.py:742: in run yielded = self.gen.throw(*exc_info) # type: ignore /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/dask_xgboost/core.py:153: in _train results = yield client._gather(futures) /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/tornado/gen.py:735: in run value = future.result() /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/distributed/client.py:1668: in _gather six.reraise(type(exception), exception, traceback) /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/six.py:692: in reraise raise value.with_traceback(tb) /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/dask_xgboost/core.py:83: in train_part dtrain = xgb.DMatrix(data, labels, **dmatrix_kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > ' {}'.format(type(data).__name__)) E TypeError: can not initialize DMatrix from COO /opt/anaconda3/envs/test-xgb-90-cpu/lib/python3.6/site-packages/xgboost/core.py:413: TypeError ----------------------------- Captured stderr call ----------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:38521 distributed.worker - INFO - Start worker at: tcp://127.0.0.1:39659 distributed.worker - INFO - Listening to: tcp://127.0.0.1:39659 distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:38521 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 1 distributed.worker - INFO - Memory: 1.16 TB distributed.worker - INFO - Local Directory: /home/sangeek/examples/dask-xgb-examples/tests/dask-worker-space/worker-wievyhvt distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Start worker at: tcp://127.0.0.1:40217 distributed.worker - INFO - Listening to: tcp://127.0.0.1:40217 distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:38521 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 2 distributed.worker - INFO - Memory: 1.16 TB distributed.worker - INFO - Local Directory: /home/sangeek/examples/dask-xgb-examples/tests/dask-worker-space/worker-v71yf5nk distributed.worker - INFO - ------------------------------------------------- distributed.scheduler - INFO - Register tcp://127.0.0.1:39659 distributed.scheduler - INFO - Register tcp://127.0.0.1:40217 distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39659 distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40217 distributed.core - INFO - Starting established connection distributed.worker - INFO - Registered to: tcp://127.0.0.1:38521 distributed.worker - INFO - ------------------------------------------------- distributed.core - INFO - Starting established connection distributed.worker - INFO - Registered to: tcp://127.0.0.1:38521 distributed.worker - INFO - ------------------------------------------------- distributed.core - INFO - Starting established connection distributed.scheduler - INFO - Receive client connection: Client-8ea853d0-ce35-11e9-ae1e-590974dc444e distributed.core - INFO - Starting established connection distributed.worker - INFO - Run out-of-band function 'start_tracker' distributed.worker - WARNING - Compute Failed Function: train_part args: ({'DMLC_NUM_WORKER': 2, 'DMLC_TRACKER_URI': '127.0.0.1', 'DMLC_TRACKER_PORT': 9091}, {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic', 'nthread': 1}, [(, array([1, 0])), (, array([1, 1]))]) kwargs: {'dmatrix_kwargs': {'feature_names': None}} Exception: TypeError('can not initialize DMatrix from COO',) distributed.worker - WARNING - Compute Failed Function: train_part args: ({'DMLC_NUM_WORKER': 2, 'DMLC_TRACKER_URI': '127.0.0.1', 'DMLC_TRACKER_PORT': 9091}, {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'binary:logistic', 'nthread': 2}, [(, array([1, 0])), (, array([1, 0])), (, array([1, 1]))]) kwargs: {'dmatrix_kwargs': {'feature_names': None}} Exception: TypeError('can not initialize DMatrix from COO',) distributed.scheduler - INFO - Remove client Client-8ea853d0-ce35-11e9-ae1e-590974dc444e distributed.scheduler - INFO - Remove client Client-8ea853d0-ce35-11e9-ae1e-590974dc444e distributed.scheduler - INFO - Close client connection: Client-8ea853d0-ce35-11e9-ae1e-590974dc444e distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:40217 distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39659 distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:40217 distributed.core - INFO - Removing comms to tcp://127.0.0.1:40217 distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:39659 distributed.core - INFO - Removing comms to tcp://127.0.0.1:39659 distributed.scheduler - INFO - Lost all workers distributed.scheduler - INFO - Scheduler closing... distributed.scheduler - INFO - Scheduler closing all comms =========================== 1 failed in 2.13 seconds ===========================

Additional context

It looks like XGBoost does not support DMatrix to be created from sparse.COO. Looking at the documentation it looks like xgboost.DMatrix(data, ...) only supports - data (string/numpy.array/scipy.sparse/pd.DataFrame/dt.Frame) – Data source of DMatrix. When data is string type, it represents the path libsvm format txt file, or binary file that xgboost can read from. ref - https://xgboost.readthedocs.io/en/release_0.90/python/python_api.html

I see that making this change to use scipy.sparse.csr_matrix instead sparse.COO helps me get past this issue -

228c228
<     dX = da.from_array(X, chunks=(2, 2)).map_blocks(sparse.COO)
---
>     dX = da.from_array(X, chunks=(2, 2)).map_blocks(scipy.sparse.csr_matrix)
237c237
<     _test_container(dbst, predictions_result, sparse.COO)
---
>     _test_container(dbst, predictions_result, scipy.sparse.csr_matrix)
TomAugspurger commented 5 years ago

IIRC, we originally used sparse.COO because it works better within dask arrays. If using a scipy.sparse matrix doesn't work, I'd recommend just skipping the test.

ksangeek commented 5 years ago

Hi @TomAugspurger Thanks for your input. I can make the change to skip this test.

Can you please help me with getting past the CI issue Too long with no output (exceeded 10m0s) seen in this PR - https://github.com/dask/dask-xgboost/pull/50? I was able to successfully get the tests run on my local machine!

TomAugspurger commented 5 years ago

Thanks, I'm not sure why that would be right now.

ksangeek commented 4 years ago

This is closed via https://github.com/dask/dask-xgboost/pull/50. Thanks @TomAugspurger!