dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Network transfer of objects with circular recursion hangs #8378

Open andreypz opened 10 months ago

andreypz commented 10 months ago

Describe the issue: I have Dask submitting jobs to condor. They seem to work fine and produce the output. However they crash at the end with the following errors:

2023-11-29 14:21:19,635 - distributed.sizeof - WARNING - Sizeof calculation failed. Defaulting to -1 B
Traceback (most recent call last):
  File "xxx/python3.10/site-packages/distributed/sizeof.py", line 17, in safe_sizeof
    return sizeof(obj)
  File "xxx/python3.10/site-packages/dask/utils.py", line 642, in __call__
    return meth(arg, *args, **kwargs)
  File "xxx/python3.10/site-packages/dask/sizeof.py", line 96, in sizeof_python_dict
    + sizeof(list(d.values()))
  File "xxx/python3.10/site-packages/dask/utils.py", line 642, in __call__
    return meth(arg, *args, **kwargs)
  File "xxx/python3.10/site-packages/dask/sizeof.py", line 59, in sizeof_python_collection
    return sys.getsizeof(seq) + sum(map(sizeof, seq))
  File "xxx/python3.10/site-packages/dask/utils.py", line 642, in __call__

 --> Here the last two errors repeat in cycle <--

RecursionError: maximum recursion depth exceeded

Workers crash due to exceeding recursion depth. But it seems like the problem is in safe_sizeof() method or in the meth:

 meth = self.dispatch(type(arg))                                                                                                                   
 return meth(arg, *args, **kwargs) 

Minimal Complete Verifiable Example: None

Environment:

fjetter commented 10 months ago

do you have a reproducer for this error? If the safe_sizeof fails, it should not fail your computation

cc @crusaderky

crusaderky commented 10 months ago

Reproduced. Fairly sure it's not safe_sizeof though.

import distributed
client = distributed.Client(n_workers=1)

def f():
    d = {}
    d[0] = d
    return d

fut = client.submit(f, key="x")
distributed.wait(fut)
# So far so good - infinite recursion is handled gracefully
2023-11-30 14:23:48,192 - distributed.sizeof - WARNING - Sizeof calculation failed. Defaulting to 0.95 MiB
Traceback (most recent call last):
...
RecursionError: maximum recursion depth exceeded while calling a Python object

# Task is finished successfully and output is stored on the worker
client.run(lambda dask_worker: str(dask_worker.data["x"]))
{'tcp://127.0.0.1:35311': '{0: {...}}'}

# However, network transfer hangs
fut.result()

gather_dep from one worker to another also hangs.

I worked on this fairly recently (#8214). Investigating.

fjetter commented 10 months ago

FWIW If #8214 is the cause, this has already been released

crusaderky commented 10 months ago

Reproduced with dask=2023.9.3 msgpack=1.0.5 (before #8214). This is not a recent regression.

andreypz commented 8 months ago

Just to mention that the issue is still there after upgrading to dask/distributed=2023.12.1.

crusaderky commented 7 months ago

Even more minimal reproducer:

>>> from distributed.protocol import serialize
>>> d = {}
>>> d[0] = d
>>> serialize(d)
RecursionError: maximum recursion depth exceeded
>>> from collections import UserDict
>>> d2 = UserDict(d)  # Wrap in opaque object to use plain pickle
>>> serialize(d2)
({'serializer': 'pickle', 'writeable': ()},
 [b'\x80\x05\x956\x00\x00\x00\x00\x00\x00\x00\x8c\x0bcollections\x94\x8c\x08UserDict\x94\x93\x94)\x81\x94}\x94\x8c\x04data\x94}\x94K\x00}\x94K\x00h\x07sssb.'])
tuckerbuchy commented 5 months ago

I am experiencing the same issue when attempting to use performance_report on a on the execution some Futures.