dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.25k stars 1.69k forks source link

ValueError: memoryview is too large (dask.array.histogram) #11046

Open lsc64 opened 3 months ago

lsc64 commented 3 months ago

Describe the issue: I'm trying to compute a histogram over a 12 TB array of pairwise distances and it fails. Returns either ValueError: memoryview is too large

``` http://127.0.0.1:8787/status [/opt/conda/lib/python3.11/site-packages/distributed/client.py:3169](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:3169): UserWarning: Sending large graph of size 12.22 GiB. This may cause some slowdown. Consider scattering data ahead of time and using futures. warnings.warn( 2024-04-11 16:18:18,764 - distributed.protocol.core - CRITICAL - Failed to Serialize Traceback (most recent call last): File "/opt/conda/lib/python3.11/site-packages/distributed/protocol/core.py", line 109, in dumps frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/msgpack/__init__.py", line 36, in packb return Packer(**kwargs).pack(o) ^^^^^^^^^^^^^^^^^^^^^^^^ File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 272, in msgpack._cmsgpack.Packer._pack ValueError: memoryview is too large 2024-04-11 16:18:18,768 - distributed.comm.utils - ERROR - memoryview is too large Traceback (most recent call last): File "/opt/conda/lib/python3.11/site-packages/distributed/comm/utils.py", line 34, in _to_frames return list(protocol.dumps(msg, **kwargs)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/distributed/protocol/core.py", line 109, in dumps frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/msgpack/__init__.py", line 36, in packb return Packer(**kwargs).pack(o) ^^^^^^^^^^^^^^^^^^^^^^^^ File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 272, in msgpack._cmsgpack.Packer._pack ValueError: memoryview is too large 2024-04-11 16:18:18,770 - distributed.batched - ERROR - Error in batched write Traceback (most recent call last): File "/opt/conda/lib/python3.11/site-packages/distributed/batched.py", line 115, in _background_send nbytes = yield coro ^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/tornado/gen.py", line 767, in run value = future.result() ^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/distributed/comm/tcp.py", line 264, in write frames = await to_frames( ^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/distributed/comm/utils.py", line 50, in to_frames return _to_frames() ^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/distributed/comm/utils.py", line 34, in _to_frames return list(protocol.dumps(msg, **kwargs)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/distributed/protocol/core.py", line 109, in dumps frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/lib/python3.11/site-packages/msgpack/__init__.py", line 36, in packb return Packer(**kwargs).pack(o) ^^^^^^^^^^^^^^^^^^^^^^^^ File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 272, in msgpack._cmsgpack.Packer._pack ValueError: memoryview is too large --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) Cell In[5], [line 4](vscode-notebook-cell:?execution_count=5&line=4) [2](vscode-notebook-cell:?execution_count=5&line=2) distances = pairwise_distances(darr, darr.compute(), metric="cosine") [3](vscode-notebook-cell:?execution_count=5&line=3) hist, bins = da.histogram(distances, bins=100, range=[0,2]) ----> [4](vscode-notebook-cell:?execution_count=5&line=4) hist.compute() File [/opt/conda/lib/python3.11/site-packages/dask/base.py:375](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:375), in DaskMethodsMixin.compute(self, **kwargs) [351](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:351) def compute(self, **kwargs): [352](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:352) """Compute this dask collection [353](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:353) [354](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:354) This turns a lazy Dask collection into its in-memory equivalent. (...) [373](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:373) dask.compute [374](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:374) """ --> [375](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:375) (result,) = compute(self, traverse=False, **kwargs) [376](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:376) return result File [/opt/conda/lib/python3.11/site-packages/dask/base.py:661](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:661), in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) [658](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:658) postcomputes.append(x.__dask_postcompute__()) [660](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:660) with shorten_traceback(): --> [661](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:661) results = schedule(dsk, keys, **kwargs) [663](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:663) return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File [/opt/conda/lib/python3.11/site-packages/distributed/client.py:2245](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2245), in Client._gather(self, futures, errors, direct, local_worker) [2243](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2243) else: [2244](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2244) raise exception.with_traceback(traceback) -> [2245](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2245) raise exc [2246](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2246) if errors == "skip": [2247](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2247) bad_keys.add(key) CancelledError: ('sum-aggregate-147f6c70731d6bcc47a228e1974422e8', 0) ```

or just cancels

``` [/opt/conda/lib/python3.11/site-packages/distributed/client.py:3169](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:3169): UserWarning: Sending large graph of size 12.22 GiB. This may cause some slowdown. Consider scattering data ahead of time and using futures. warnings.warn( --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) Cell In[18], [line 4](vscode-notebook-cell:?execution_count=18&line=4) [2](vscode-notebook-cell:?execution_count=18&line=2) distances = pairwise_distances(darr, darr.compute(), metric="cosine") [3](vscode-notebook-cell:?execution_count=18&line=3) hist, bins = da.histogram(distances, bins=100, range=[0,2]) ----> [4](vscode-notebook-cell:?execution_count=18&line=4) hist.compute() File [/opt/conda/lib/python3.11/site-packages/dask/base.py:375](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:375), in DaskMethodsMixin.compute(self, **kwargs) [351](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:351) def compute(self, **kwargs): [352](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:352) """Compute this dask collection [353](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:353) [354](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:354) This turns a lazy Dask collection into its in-memory equivalent. (...) [373](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:373) dask.compute [374](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:374) """ --> [375](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:375) (result,) = compute(self, traverse=False, **kwargs) [376](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:376) return result File [/opt/conda/lib/python3.11/site-packages/dask/base.py:661](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:661), in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) [658](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:658) postcomputes.append(x.__dask_postcompute__()) [660](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:660) with shorten_traceback(): --> [661](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:661) results = schedule(dsk, keys, **kwargs) [663](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/dask/base.py:663) return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File [/opt/conda/lib/python3.11/site-packages/distributed/client.py:2245](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2245), in Client._gather(self, futures, errors, direct, local_worker) [2243](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2243) else: [2244](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2244) raise exception.with_traceback(traceback) -> [2245](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2245) raise exc [2246](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2246) if errors == "skip": [2247](https://vscode-remote+attached-002dcontainer-002b7b22636f6e7461696e65724e616d65223a222f6a7570797465722d6a7570797465722d31222c2273657474696e6773223a7b22686f7374223a22756e69783a2f2f2f72756e2f757365722f3732373630363637382f646f636b65722e736f636b227d7d-0040ssh-002dremote-002bdgx2.vscode-resource.vscode-cdn.net/opt/conda/lib/python3.11/site-packages/distributed/client.py:2247) bad_keys.add(key) CancelledError: ('sum-aggregate-3a00e4279a13a440e2503725c63a585b', 0) ```

Minimal Complete Verifiable Example:

import dask.array as da
from dask_ml.metrics import pairwise_distances
darr = da.random.normal(size=(1281167, 1280))
distances = pairwise_distances(darr, darr.compute(), metric="cosine")
hist, bins = da.histogram(distances, bins=100, range=[0,2])
hist.compute() # error

Anything else we need to know?: Just computing the histogram of such a large matrix works

darr = da.random.normal(size=(1281167, 1281167))
hist, bins = da.histogram(darr, bins=100, range=[0,2])
hist.compute() # works

Environment:

fjetter commented 3 months ago

I strong recommend to not pass the computed array darr.compute() to pairwise_distance. It should just be darr. This will otherwise materialize the entire array into memory and will require you to send all of this over the network.

I suspect that the bug you are running into is actually already fixed in https://github.com/dask/distributed/pull/8507 but you wouldn't have a good time submitting 12TB from your client to the scheduler.

lsc64 commented 3 months ago

I strong recommend to not pass the computed array darr.compute() to pairwise_distance. It should just be darr. This will otherwise materialize the entire array into memory and will require you to send all of this over the network.

I suspect that the bug you are running into is actually already fixed in dask/distributed#8507 but you wouldn't have a good time submitting 12TB from your client to the scheduler.

Unfortunately that functions requires a dask.array and a numpy.array, otherwise it would of course be nicer to not do that. https://github.com/dask/dask-ml/blob/b95ba909c6dcd37c566f5193ba0b918396edaaee/dask_ml/metrics/pairwise.py#L58

if isinstance(Y, da.Array):
        raise TypeError("`Y` must be a numpy array")

If I batch the materialized array into 100k slices (which reduces the graph size) it works, so you're probably right!

hists = []
batch_size = 100000
for batch in tqdm(range(darr.shape[0] // batch_size)):
    distances = pairwise_distances(
        darr,
        darr[
            batch * batch_size : min((batch + 1) * batch_size, darr.shape[0])
        ].compute(),
        metric="cosine",
    )
    hist, bins = da.histogram(distances, bins=100, range=[0, 2])
    hists.append(hist)
da.compute(hists) # works, still computes everything at once

Do I have the patch if I install from source?

fjetter commented 3 months ago

Unfortunately that functions requires a dask.array and a numpy.array,

Sorry, I missed that. I haven't tried to understand your batching code to ensure if it is correct. If it is, maybe you want to contribute this to dask-ml because a "proper" dask algorithm works similarly. I don't know enough about the pairwise_distance algorithm to tell

However, what I can tell you is that if you include a 12TB array in the map_blocks call of https://github.com/dask/dask-ml/blob/b95ba909c6dcd37c566f5193ba0b918396edaaee/dask_ml/metrics/pairwise.py#L60-L67 this will replicate that array to the scheduler and all dask workers. I doubt this is what you want to do.

Do I have the patch if I install from source?

I just checked and this was already released in 2024.2.1 (the version you are running on). By breaking up the array you are avoiding all sorts of problems so if this is possible, go for it.

lsc64 commented 3 months ago

Sorry, I missed that. I haven't tried to understand your batching code to ensure if it is correct. If it is, maybe you want to contribute this to dask-ml because a "proper" dask algorithm works similarly. I don't know enough about the pairwise_distance algorithm to tell

No worries, I just like to leave code snippets in case anyone has the same issue, so they're not faced the unhelpful "nvm I solved it". I can open a PR at some point and discuss this over there.

However, what I can tell you is that if you include a 12TB array in the map_blocks call of https://github.com/dask/dask-ml/blob/b95ba909c6dcd37c566f5193ba0b918396edaaee/dask_ml/metrics/pairwise.py#L60-L67 this will replicate that array to the scheduler and all dask workers. I doubt this is what you want to do.

For X.map_blocks(fn, Y) the array Y gets fully replicated, X or both? But that array/those arrays are not materialized right? The docs give an example, which is exactly what I/the function wants to achieve (just for huge arrays and lambda a,b,: distance(a,b))

d = da.arange(5, chunks=2)
e = da.arange(5, chunks=2)
f = da.map_blocks(lambda a, b: a + b**2, d, e)
f.compute()

I just checked and this was already released in 2024.2.1 (the version you are running on). By breaking up the array you are avoiding all sorts of problems so if this is possible, go for it.

We need bigger graphs!! /s (but maybe actually)