mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.68k stars 325 forks source link

[BUG] throw `AttributeError` when using mars tensor with `gpu=True` on colab #3331

Closed dlee992 closed 1 year ago

dlee992 commented 1 year ago

Use Mars==0.10.0, and execute code below in Google Colab. PS, I use the Colab template from the answer of https://stackoverflow.com/questions/69407234/not-able-to-install-cudf-cupy-and-cuml-into-colab-with-rapids-ai-version-21-08, which can help me install and configure the RAPIDS runtime without any difficulty.

import mars.tensor as mt
a = mt.ones(10, gpu=True)  # indicate to create tensor on CUDA
a.sum().execute()          # execution will happen on CUDA

Traceback:

``` --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) [/usr/local/lib/python3.8/dist-packages/IPython/core/formatters.py](https://localhost:8080/#) in __call__(self, obj) 700 type_pprinters=self.type_printers, 701 deferred_pprinters=self.deferred_printers) --> 702 printer.pretty(obj) 703 printer.flush() 704 return stream.getvalue() 24 frames [/usr/local/lib/python3.8/dist-packages/IPython/lib/pretty.py](https://localhost:8080/#) in pretty(self, obj) 400 if cls is not object \ 401 and callable(cls.__dict__.get('__repr__')): --> 402 return _repr_pprint(obj, self, cycle) 403 404 return _default_pprint(obj, self, cycle) [/usr/local/lib/python3.8/dist-packages/IPython/lib/pretty.py](https://localhost:8080/#) in _repr_pprint(obj, p, cycle) 695 """A pprint that just redirects to the normal repr function.""" 696 # Find newlines and replace them with p.break_() --> 697 output = repr(obj) 698 for idx,output_line in enumerate(output.splitlines()): 699 if idx: [/usr/local/lib/python3.8/dist-packages/mars/core/entity/core.py](https://localhost:8080/#) in __repr__(self) 102 103 def __repr__(self): --> 104 return self._data.__repr__() 105 106 def _check_data(self, data): [/usr/local/lib/python3.8/dist-packages/mars/tensor/core.py](https://localhost:8080/#) in __repr__(self) 241 242 def __repr__(self): --> 243 return self._to_str(representation=True) 244 245 @property [/usr/local/lib/python3.8/dist-packages/mars/tensor/core.py](https://localhost:8080/#) in _to_str(self, representation) 229 threshold = print_options["threshold"] 230 --> 231 corner_data = fetch_corner_data(self, session=self._executed_sessions[-1]) 232 # if less than default threshold, just set it as default, 233 # if not, set to corner_data.size - 1 make sure ... exists in repr [/usr/local/lib/python3.8/dist-packages/mars/tensor/utils.py](https://localhost:8080/#) in fetch_corner_data(tensor, session) 805 return np.block(corners.tolist()) 806 else: --> 807 return tensor.fetch(session=session) 808 809 [/usr/local/lib/python3.8/dist-packages/mars/core/entity/executable.py](https://localhost:8080/#) in fetch(self, session, **kw) 162 163 def fetch(self, session: SessionType = None, **kw): --> 164 return self._fetch(session=session, **kw) 165 166 def fetch_log( [/usr/local/lib/python3.8/dist-packages/mars/core/entity/executable.py](https://localhost:8080/#) in _fetch(self, session, **kw) 159 session = _get_session(self, session) 160 self._check_session(session, "fetch") --> 161 return fetch(self, session=session, **kw) 162 163 def fetch(self, session: SessionType = None, **kw): [/usr/local/lib/python3.8/dist-packages/mars/deploy/oscar/session.py](https://localhost:8080/#) in fetch(tileable, session, *tileables, **kwargs) 1924 1925 session = _ensure_sync(session) -> 1926 return session.fetch(tileable, *tileables, **kwargs) 1927 1928 [/usr/local/lib/python3.8/dist-packages/mars/deploy/oscar/session.py](https://localhost:8080/#) in fetch(self, *tileables, **kwargs) 1703 def fetch(self, *tileables, **kwargs) -> list: 1704 coro = _fetch(*tileables, session=self._isolated_session, **kwargs) -> 1705 return asyncio.run_coroutine_threadsafe(coro, self._loop).result() 1706 1707 @implements(AbstractSyncSession.fetch_infos) [/usr/lib/python3.8/concurrent/futures/_base.py](https://localhost:8080/#) in result(self, timeout) 442 raise CancelledError() 443 elif self._state == FINISHED: --> 444 return self.__get_result() 445 else: 446 raise TimeoutError() [/usr/lib/python3.8/concurrent/futures/_base.py](https://localhost:8080/#) in __get_result(self) 387 if self._exception: 388 try: --> 389 raise self._exception 390 finally: 391 # Break a reference cycle with the exception in self._exception [/usr/local/lib/python3.8/dist-packages/mars/deploy/oscar/session.py](https://localhost:8080/#) in _fetch(tileable, session, *tileables, **kwargs) 1892 tileable, tileables = tileable[0], tileable[1:] 1893 session = _get_isolated_session(session) -> 1894 data = await session.fetch(tileable, *tileables, **kwargs) 1895 return data[0] if len(tileables) == 0 else data 1896 [/usr/local/lib/python3.8/dist-packages/mars/deploy/oscar/session.py](https://localhost:8080/#) in fetch(self, *tileables, **kwargs) 1125 ): 1126 await fetcher.append(chunk.key, meta, fetch_info.indexes) -> 1127 fetched_data = await fetcher.get() 1128 for fetch_info, data in zip( 1129 itertools.chain(*fetch_infos_list), fetched_data [/usr/local/lib/python3.8/dist-packages/mars/services/task/execution/mars/fetcher.py](https://localhost:8080/#) in get(self) 50 for storage_api in self._storage_api_to_gets: 51 gets = self._storage_api_to_gets[storage_api] ---> 52 fetched_data = await storage_api.get.batch( 53 *map(operator.itemgetter(0), gets) 54 ) [/usr/local/lib/python3.8/dist-packages/mars/oscar/batch.py](https://localhost:8080/#) in _async_batch(self, args_list, kwargs_list) 144 return [] 145 elif len(args_list) == 1: --> 146 return [await self._async_call(*args_list[0], **kwargs_list[0])] 147 elif self.batch_func: 148 return await self.batch_func(args_list, kwargs_list) [/usr/local/lib/python3.8/dist-packages/mars/oscar/batch.py](https://localhost:8080/#) in _async_call(self, *args, **kwargs) 93 try: 94 if self.has_single_func: ---> 95 return await self.func(*args, **kwargs) 96 except NotImplementedError: 97 self.has_single_func = False [/usr/local/lib/python3.8/dist-packages/mars/services/storage/api/oscar.py](https://localhost:8080/#) in get(self, data_key, conditions, error) 88 self, data_key: str, conditions: List = None, error: str = "raise" 89 ) -> Any: ---> 90 return await self._storage_handler_ref.get( 91 self._session_id, data_key, conditions, error 92 ) [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/context.py](https://localhost:8080/#) in send(self, actor_ref, message, wait_response, profiling_context) 193 future = await self._call(actor_ref.address, message, wait=False) 194 if wait_response: --> 195 result = await self._wait(future, actor_ref.address, message) 196 return self._process_result_message(result) 197 else: [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/context.py](https://localhost:8080/#) in _wait(self, future, address, message) 87 except: # noqa: E722 # nosec # pylint: disable=bare-except 88 pass ---> 89 return await future 90 91 async def create_actor( [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/context.py](https://localhost:8080/#) in _wait(self, future, address, message) 78 async def _wait(self, future: asyncio.Future, address: str, message: _MessageBase): 79 try: ---> 80 await asyncio.shield(future) 81 except asyncio.CancelledError: 82 try: [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/core.py](https://localhost:8080/#) in _listen(self, client) 58 try: 59 try: ---> 60 message: _MessageBase = await client.recv() 61 except (EOFError, ConnectionError, BrokenPipeError): 62 # remote server closed, close client and raise ServerClosed [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/communication/base.py](https://localhost:8080/#) in recv(self) 260 @implements(Channel.recv) 261 async def recv(self): --> 262 return await self.channel.recv() 263 264 async def close(self): [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/communication/socket.py](https://localhost:8080/#) in recv(self) 85 async with self._recv_lock: 86 header = await deserializer.get_header() ---> 87 buffers = await read_buffers(header, self.reader) 88 return deserialize(header, buffers) 89 [/usr/local/lib/python3.8/dist-packages/mars/oscar/backends/communication/utils.py](https://localhost:8080/#) in read_buffers(header, reader) 79 buffers.append(content) 80 else: ---> 81 cuda_buffer = CPBuffer.empty(buf_size) 82 cupy_memory = CPUnownedMemory(cuda_buffer.ptr, buf_size, cuda_buffer) 83 offset = 0 AttributeError: type object 'Buffer' has no attribute 'empty' ```

As contrast, Dask can execute the same computation without any error.

import dask
import dask.array as da
with dask.config.set({"array.backend": "cupy"}):
    darr = da.ones(10) # Get cupy-backed collection
    print(darr)
    result = darr.compute()
    print(f"{type(result)}: {result}")

outputs:

``` dask.array : [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.] ```
dlee992 commented 1 year ago

If this is a Mars bug, any idea about how to fix this? I'm unfamiliar with Mars source code, but would like to give it a try.

dlee992 commented 1 year ago

I guess this PR https://github.com/mars-project/mars/pull/3330 can probably fix this issue. Since this PR changes the way of reading/writing buffer for CUDA, see https://github.com/mars-project/mars/pull/3330/files#diff-e288ecb73e105160acd53edb9a8c00be733cc5587fc1b322c65db73aa1ba8c1aR43-R57