dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Using `log_event` in task with recursive object causes client to wait indefinitely #8704

Open hendrikmakait opened 4 months ago

hendrikmakait commented 4 months ago

Reproducer

@gen_cluster(client=True, nthreads=[("", 1)])
async def test_client_waits_indefinitely_on_log_event_recursion_error(c, s, a):
    def error(*args):
        from distributed import get_worker
        inf = []
        inf.append(inf)
        get_worker().log_event("inf", inf) 

    await c.submit(error, 0)

times out with the following logs

2024-06-19 09:00:36,715 - distributed.protocol.pickle - ERROR - Failed to serialize maximum recursion depth exceeded in __instancecheck__.
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/worker.py", line 2997, in apply_function_simple
    result = function(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/tests/test_client.py", line 8577, in error
    get_worker().log_event("inf", inf)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/worker.py", line 973, in log_event
    if not _is_dumpable(msg):
           ^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 896, in _is_dumpable
    and all(map(_is_dumpable, v))
        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 896, in _is_dumpable
    and all(map(_is_dumpable, v))
        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 896, in _is_dumpable
    and all(map(_is_dumpable, v))
        ^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 491 more times]
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 895, in _is_dumpable
    or isinstance(v, (list, tuple))
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded in __instancecheck__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded while pickling an object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 24, in reducer_override
    if _always_use_pickle_for(obj):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 35, in _always_use_pickle_for
    mod, _, _ = x.__class__.__module__.partition(".")
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded while calling a Python object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded while pickling an object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1249, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
2024-06-19 09:00:36,743 - distributed.protocol.pickle - ERROR - Failed to serialize <traceback object at 0x171b4eb40>.
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/worker.py", line 2997, in apply_function_simple
    result = function(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/tests/test_client.py", line 8577, in error
    get_worker().log_event("inf", inf)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/worker.py", line 973, in log_event
    if not _is_dumpable(msg):
           ^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 896, in _is_dumpable
    and all(map(_is_dumpable, v))
        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 896, in _is_dumpable
    and all(map(_is_dumpable, v))
        ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 896, in _is_dumpable
    and all(map(_is_dumpable, v))
        ^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 491 more times]
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/serialize.py", line 895, in _is_dumpable
    or isinstance(v, (list, tuple))
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded in __instancecheck__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded while pickling an object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 24, in reducer_override
    if _always_use_pickle_for(obj):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/collections/__init__.py", line 1006, in __getitem__
    return self.__missing__(key)            # support subclasses that define __missing__
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/collections/__init__.py", line 998, in __missing__
    raise KeyError(key)
          ^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded while calling a Python object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1249, in dump
    raise pickle.PicklingError(msg) from e
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
2024-06-19 09:00:36,760 - distributed.worker - WARNING - Compute Failed
Key:       error-9305f5e988d6c46af3092e961be0e742
State:     executing
Function:  error
args:      (0)
kwargs:    {}
Exception: "RecursionError('maximum recursion depth exceeded in __instancecheck__')"

2024-06-19 09:00:36,761 - distributed.client - ERROR - 
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 1595, in _handle_report
    result = handler(**msg)
             ^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 1638, in _handle_task_erred
    state.set_error(exception, traceback)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 613, in set_error
    _, exception, traceback = clean_exception(exception, traceback)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1862, in clean_exception
    assert isinstance(traceback, types.TracebackType) or traceback is None
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

Note that the task is correctly transitioned to erred on the scheduler, but the client just waits indefinitely because it can't process this.

fjetter commented 4 months ago

I think this is a duplicate of https://github.com/dask/distributed/issues/8378