Open quasiben opened 1 year ago
@quasiben can you share a bit more information about what is actually breaking? I outlined in https://github.com/dask/distributed/issues/7639#issuecomment-1490101858 that I am deeply confused why this is causing any breakage. I discuss how this finalizer, even if called, should not have any action and everything in CPython docs suggest that this finalizer should not even be called.
I would like to ensure that we cover the functionality you require and make sure this is not just a red herring. I do not want to assert on a functionality that the CPython docs tell me should not even be there.
@quasiben can you share a bit more information about what is actually breaking? I outlined in #7639 (comment) that I am deeply confused why this is causing any breakage.
In the MRE, the breakage occurs when client/cluster are shutting down, a bunch of exceptions are raised and everything hangs until the process ultimately times out after 60 seconds. The person who originally reported the issue @randerzander also mentioned it occurs during his workflows, I can't say for sure what happens there because we're trying to work on the MRE first, but I imagine this occurs when workers are getting restarted. @randerzander please add more information about when it happens if this is known.
I discuss how this finalizer, even if called, should not have any action and everything in CPython docs suggest that this finalizer should not even be called.
I think we can for now discarding that it "should not be called", it's pretty clear at this point it is called, even with protocol="tcp"
the reproducer above proves it's being called. As for the statement that there should not be any action, I honestly haven't looked in depth on it, but clearly it ensures some ordering to cleanup events that do not occur/happen differently otherwise.
I would like to ensure that we cover the functionality you require and make sure this is not just a red herring. I do not want to assert on a functionality that the CPython docs tell me should not even be there.
Ultimately, I agree. The problem here seems to be that there is no guarantees on the ordering of destruction of objects. This seems to happen because the ordering is in fact unpredictable. For instance, _offload_executor
gets created at distributed.utils
import time, which makes it hard to debug when things should be destroyed and that we in fact destroy everything correctly, making the cleanup in Distributed very fragile. This sort of issue related to cleaning up has happened multiple times in the past 2 years or so.
FWIW, we just need any finalizer here to fix Ben's initial repro:
e.g. this runs fine for me if I add this patch:
diff --git a/distributed/utils.py b/distributed/utils.py
index 4c1a6642..64904402 100644
--- a/distributed/utils.py
+++ b/distributed/utils.py
@@ -1401,7 +1401,7 @@ def is_valid_xml(text):
_offload_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dask-Offload")
-
+weakref.finalize(_offload_executor, lambda: None)
def import_term(name: str) -> AnyType:
"""Return the fully qualified term
I think python makes a best-effort attempt to run finalizers that contain ref-cycles at interpreter shutdown and probably we've just been lucky that the particular combo of finalizers has been such that things run in an order that happens to work.
I don't think this is a clean solution, but it might be sufficient to patch over the issue for now.
As another data point, if I just completely cull the global _offload_executor
object, Ben's issue still exhibits
Seems to be something to do with some datastructure setup that I don't think should be relied on, but if I create a no-op finalizer before the atexit handler in distributed/deploy/spec.py
is registered, then Ben's example also works fine.
That is:
Cluster closes gracefully
diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py
index c35ec764..134a3bd8 100644
--- a/distributed/deploy/spec.py
+++ b/distributed/deploy/spec.py
@@ -686,6 +686,9 @@ async def run_spec(spec: dict[str, Any], *args: Any) -> dict[str, Worker | Nanny
return workers
+weakref.finalize(lambda: None, lambda: None)
+
+
@atexit.register
def close_clusters():
for cluster in list(SpecCluster._instances):
Interpreter segfaults:
diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py
index c35ec764..34bbe68f 100644
--- a/distributed/deploy/spec.py
+++ b/distributed/deploy/spec.py
@@ -693,3 +693,6 @@ def close_clusters():
with suppress(gen.TimeoutError, TimeoutError):
if getattr(cluster, "status", Status.closed) != Status.closed:
cluster.close(timeout=10)
+
+
+weakref.finalize(lambda: None, lambda: None)
Apologies @fjetter for not mentioning this earlier. I think you are seeing some escalation here because there is a bit of urgency for us (RAPIDS). We are going through a RAPIDS release now and had just discovered this bug yesterday (bad timing us) and we had/have pinned to Dask 2023.3.2. This Dask release includes PR https://github.com/dask/distributed/pull/7644 and thus we are scrambling a bit for fix short-term.
There was hope that we could work around this issue on our end and that hope lead me to delay mentioning our sense urgency -- my apologies for not laying it all out right way. Perhaps @wence- and I can continue exploring down the path Lawrence mentioned in the previous issue we simultaneously produce a release 2023.3.2.1 with the commit reverted
Thanks @wence- ! What you are posting in https://github.com/dask/distributed/issues/7726#issuecomment-1490666290 is very interesting and aligns with my suspicions that there is something else going on.
Interpreter segfaults:
So much fun. What python version was this on? The place where this is raising is interesting since it's again one of those "catch all" atexits, close_clusters
. I would prefer not having any of those
I'm happy to add any harmless patches to the code base (i.e. also fine reverting the original change) if that unblocks your release but I would like us to make progress on making this more robust. It looks like the RAPIDS team is hit by this rather frequently. If there is anything we can or should refactor to make this easier to maintain we should talk about this. I'm counting on help from the RAPIDS team for this. Given this bug report I don't even know what kind of assertion is breaking or what kind of resource is not properly cleaned up.
So much fun. What python version was this on? The place where this is raising is interesting since it's again one of those "catch all" atexits, close_clusters. I would prefer not having any of those
This is 3.10. But it's the combination with UCX I think that is causing a segfault. We've somehow lost well before we get the segfault and so various invariants are not maintained and (sometimes) the asyncio implementation bugs out.
I'm happy to add any harmless patches to the code base (i.e. also fine reverting the original change) if that unblocks your release but I would like us to make progress on making this more robust. It looks like the RAPIDS team is hit by this rather frequently. If there is anything we can or should refactor to make this easier to maintain we should talk about this. I'm counting on help from the RAPIDS team for this.
Yeah, I would like to understand this. I am generally leary of state teardown in atexit handlers since ordering is import-order sensitive and that seems wrong.
My reading of the tea-leaves suggests that something like the following is happening:
close_clusters
atexit handler we see this backtrace (and then hang):Exception ignored in atexit callback: <function close_clusters at 0x7f86149e3e20>
Traceback (most recent call last):
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/spec.py", line 695, in close_clusters
cluster.close(timeout=10)
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/cluster.py", line 219, in close
return self.sync(self._close, callback_timeout=timeout)
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 349, in sync
return sync(
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 416, in sync
raise exc.with_traceback(tb)
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 389, in f
result = yield future
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
value = future.result()
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 1849, in wait_for
return await asyncio.wait_for(fut, timeout)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
return fut.result()
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/spec.py", line 441, in _close
await self._correct_state()
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/deploy/spec.py", line 348, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/core.py", line 1185, in send_recv_from_rpc
comm = await self.live_comm()
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/core.py", line 1144, in live_comm
comm = await connect(
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/core.py", line 292, in connect
comm = await wait_for(
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 1849, in wait_for
return await asyncio.wait_for(fut, timeout)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
return fut.result()
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/ucx.py", line 466, in connect
ep = await ucp.create_endpoint(ip, port)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py", line 1004, in create_endpoint
return await _get_ctx().create_endpoint(
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py", line 304, in create_endpoint
ucx_ep = ucx_api.UCXEndpoint.create(
File "ucp/_libs/ucx_endpoint.pyx", line 255, in ucp._libs.ucx_api.UCXEndpoint.create
AssertionError:
What looks like is going on here is that the cluster shutdown is somehow happening after the ucx-py resources have been released, and so in finalization of the cluster, there is an attempt to resurrect the comm channel which fails because UCX is already shutdown.
I don't really understand how the object ownership model is supposed to work in this case right now (i.e. who is responsible for teardown in which order) in these kind of setups where all the workers and so forth are booted in the background.
Here's an exemplar backtrace for the segfault case. As you can see, the GC is completely hosed at this point
#8 <signal handler called>
#9 _PyObject_IS_GC (obj=<unknown at remote 0x143159b29b44>) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:452
#10 visit_decref (parent=<optimised out>, op=<unknown at remote 0x143159b29b44>)
at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:456
#11 list_traverse (o=0x7fb55de6e300, visit=0x555b330477b0 <visit_decref>, arg=0x7fb55de6e300)
at /usr/local/src/conda/python-3.10.10/Objects/listobject.c:2653
#12 0x0000555b330473e5 in subtract_refs (containers=<optimised out>)
at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:482
#13 deduce_unreachable (base=base@entry=0x555b3340ad60, unreachable=unreachable@entry=0x7fb569c217c0)
at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1105
#14 0x0000555b3304606f in gc_collect_main (tstate=0x555b33e27eb0, generation=0, n_collected=0x7fb569c218a0,
n_uncollectable=0x7fb569c21898, nofail=0) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1239
#15 0x0000555b330fa98e in gc_collect_with_callback (tstate=tstate@entry=0x555b33e27eb0, generation=0)
at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1413
#16 0x0000555b33044c50 in gc_collect_generations (tstate=0x555b33e27eb0)
at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:1468
#17 _PyObject_GC_Alloc (basicsize=56, use_calloc=0) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:2297
#18 _PyObject_GC_Malloc (basicsize=56) at /usr/local/src/conda/python-3.10.10/Modules/gcmodule.c:2307
#19 PyType_GenericAlloc (type=0x555b3363c3b0, nitems=0) at /usr/local/src/conda/python-3.10.10/Objects/typeobject.c:1156
#20 0x0000555b3305cc09 in type_call (kwds={'lookup_line': False, 'locals': None},
args=('/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py', 725, 'recv'),
type=0x555b3363c3b0) at /usr/local/src/conda/python-3.10.10/Objects/typeobject.c:1123
#21 _PyObject_MakeTpCall (tstate=0x555b33e27eb0, callable=<type at remote 0x555b3363c3b0>, args=<optimised out>,
nargs=<optimised out>, keywords=('lookup_line', 'locals')) at /usr/local/src/conda/python-3.10.10/Objects/call.c:215
#22 0x0000555b33058ec9 in _PyObject_VectorcallTstate (kwnames=('lookup_line', 'locals'), nargsf=<optimised out>,
args=<optimised out>, callable=<type at remote 0x555b3363c3b0>, tstate=0x555b33e27eb0)
at /usr/local/src/conda/python-3.10.10/Include/cpython/abstract.h:112
#23 _PyObject_VectorcallTstate (kwnames=('lookup_line', 'locals'), nargsf=<optimised out>, args=<optimised out>,
callable=<type at remote 0x555b3363c3b0>, tstate=0x555b33e27eb0)
at /usr/local/src/conda/python-3.10.10/Include/cpython/abstract.h:99
#24 PyObject_Vectorcall (kwnames=('lookup_line', 'locals'), nargsf=<optimised out>, args=<optimised out>,
callable=<type at remote 0x555b3363c3b0>) at /usr/local/src/conda/python-3.10.10/Include/cpython/abstract.h:123
#25 call_function (kwnames=('lookup_line', 'locals'), oparg=<optimised out>, pp_stack=<synthetic pointer>,
trace_info=0x7fb569c21a30, tstate=<optimised out>) at /usr/local/src/conda/python-3.10.10/Python/ceval.c:5891
#26 _PyEval_EvalFrameDefault (tstate=<optimised out>,
f=Frame 0x7fb564daa8e0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py, line 376, in extract (klass=<type at remote 0x555b3362e910>, frame_gen=<generator at remote 0x7fb557f22110>, limit=None, lookup_lines=True, capture_locals=False, result=<StackSummary at remote 0x7fb554b583b0>, fnames={'/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py', '/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/ucx.py'}, f=Frame 0x7fb557f84fc0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py, line 725, in recv (self=<Endpoint(_ep=<ucp._libs.ucx_api.UCXEndpoint at remote 0x7fb557f20c80>, _ctx=<ApplicationContext(progress_tasks=[], context=<ucp._libs.ucx_api.UCXContext at remote 0x7fb55de11ea0>, worker=<ucp._libs.ucx_api.UCXWorker at remote 0x7fb55df6dd40>, blocking_progress_mode=True, epoll_fd=49) at remote 0x7fb55de30be0>, _send_count=6, _recv_count=10, _finis...(truncated), throwflag=<optimised out>) at /usr/local/src/conda/python-3.10.10/Python/ceval.c:4231
#27 0x0000555b3306f581 in _PyEval_EvalFrame (throwflag=0,
f=Frame 0x7fb564daa8e0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py, line 376, in extract (klass=<type at remote 0x555b3362e910>, frame_gen=<generator at remote 0x7fb557f22110>, limit=None, lookup_lines=True, capture_locals=False, result=<StackSummary at remote 0x7fb554b583b0>, fnames={'/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py', '/home/wence/Documents/src/rapids/third-party/distributed/distributed/comm/ucx.py'}, f=Frame 0x7fb557f84fc0, for file /home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/core.py, line 725, in recv (self=<Endpoint(_ep=<ucp._libs.ucx_api.UCXEndpoint at remote 0x7fb557f20c80>, _ctx=<ApplicationContext(progress_tasks=[], context=<ucp._libs.ucx_api.UCXContext at remote 0x7fb55de11ea0>, worker=<ucp._libs.ucx_api.UCXWorker at remote 0x7fb55df6dd40>, blocking_progress_mode=True, epoll_fd=49) at remote 0x7fb55de30be0>, _send_count=6, _recv_count=10, _finis...(truncated), tstate=0x555b33e27eb0)
at /usr/local/src/conda/python-3.10.10/Include/internal/pycore_ceval.h:46
#28 _PyEval_Vector (kwnames=<optimised out>, argcount=<optimised out>, args=0x7fb564c21f10, locals=0x0, con=0x7fb675dcdbe0,
With matching python backtrace
(gdb) py-bt
Traceback (most recent call first):
Garbage-collecting
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 376, in extract
result.append(FrameSummary(
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 502, in __init__
self.stack = StackSummary.extract(
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 552, in __init__
context = TracebackException(
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/traceback.py", line 119, in print_exception
te = TracebackException(type(value), value, tb, limit=limit, compact=True)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 636, in formatException
traceback.print_exception(ei[0], ei[1], tb, None, sio)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 686, in format
record.exc_text = self.formatException(record.exc_info)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 943, in format
return fmt.format(record)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1100, in emit
msg = self.format(record)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 968, in handle
self.emit(record)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1696, in callHandlers
hdlr.handle(record)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1634, in handle
self.callHandlers(record)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1624, in _log
self.handle(record)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1506, in error
self._log(ERROR, msg, args, **kwargs)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/logging/__init__.py", line 1512, in exception
self.error(msg, *args, exc_info=exc_info, **kwargs)
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 778, in __exit__
logger.exception(exc_value)
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 751, in wrapper
with self:
<built-in method task_wakeup of _asyncio.Task object at remote 0x7fb55df57850>
<built-in method run of _contextvars.Context object at remote 0x7fb55dddd840>
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 1909, in _run_once
handle._run()
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
self._run_once()
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 215, in start
self.asyncio_loop.run_forever()
File "/home/wence/Documents/src/rapids/third-party/distributed/distributed/utils.py", line 510, in run_loop
loop.start()
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/threading.py", line 973, in _bootstrap
self._bootstrap_inner()
I am generally leary of state teardown in atexit handlers since ordering is import-order sensitive and that seems wrong.
collecting all atexit handlers in a single module and register them in the "correct" order might already be helpful, wouldn't it? FWIW I don't think anybody likes the atexit handlers. If we can ensure proper cleanup without them somehow that would be very interesting. The only mechanism I am aware of that works robust is a contextmanager but ages ago the decision was made that our objects should work without the need of context managers and I am afraid this is a decision that is difficult to revoke by now. Particularly in notebook environments I believe this is a requirement.
So there are, AFAICT, three atexit handlers:
from ._concurrent_futures_thread import _python_exit
from .client import _close_global_client
from .deploy.spec import close_clusters
# atexit.register(_python_exit)
# atexit.register(_close_global_client)
# atexit.register(close_clusters)
del _python_exit
del _close_global_client
del close_clusters
If I remove the registration of all of them, and run the bug with python bug.py
then the code "completes" but produces a few warnings. If I do python -X dev -X tracemalloc bug.py
then I see:
shutting down...
<Client: 'ucx://127.0.0.1:58631' processes=1 threads=1, memory=124.45 GiB>
[1680262822.127658] [shallot:51402] UCXPY ERROR Non-thread-safe operation invoked on an event loop other than the current one
Traceback (most recent call last):
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/_libs/exceptions.py", line 13, in log_errors
yield
File "ucp/_libs/transfer_tag.pyx", line 141, in ucp._libs.ucx_api._tag_recv_callback
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/site-packages/ucp/comm.py", line 16, in _cb_func
future.set_exception(exception)
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 755, in call_soon
self._check_thread()
File "/home/wence/Documents/apps/mambaforge/envs/test-weakref/lib/python3.10/asyncio/base_events.py", line 792, in _check_thread
raise RuntimeError(
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
So somehow some event loop is out of whack.
Having done some more debugging (with @pentschev) the Comm
object is being torn down on a thread other than the one that created it. This is problematic since ucx-py is not thread safe, and it appears that we are either in a deadlock situation (where the main thread has the UCX spinlock but not the GIL, and the communication thread has the GIL, but not the spinlock), or we get a little further release the GIL and cleanup occurs but smashes some part of the interpreter stack and we see segfaults.
The hypothesis as to why this no-op weakref finalizer "fixes" things is that it fortuitously happens to arrange that everything that is torn down is done so on the thread that owns the resource.
So, we don't have a solution right now (certainly not a quick fix that is less dangerous than re-introducing this no-op finalization).
The hypothesis as to why this no-op weakref finalizer "fixes" things is that it fortuitously happens to arrange that everything that is torn down is done so on the thread that owns the resource.
A little more detail.
tl:dr; weakref.finalize
is not a no-op because it changes the state of the atexit
handler registry.
From the Python docs (and also by looking at the implementation of weakref.finalize):
When the program exits, each remaining live finalizer is called unless its atexit attribute has been set to false. They are called in reverse order of creation.
(Note that these calls are always made, even if the object is still alive).
How does this work? The first time weakref.finalize
is called it hooks an atexit callback in with atexit.register
(let's call it weakref_atexit
).
Distributed also registers some atexit handlers to run during shutdown. atexit handlers are run in last-in-first-out order.
So we can have two scenarios:
In scenario 1. the destruction order will be distributed_atexit
, weakref_atexit
; In scenario 2. it will be weakref_atexit
, distributed_atexit
.
UCX-Py uses weakref.finalize
to clean up after its objects go out of scope, but these finalizers are set at object creation time. Because the cluster objects are only torn down in distributed_atexit
these finalizers have not yet fired (because the comm is still live at this point).
If we're in scenario 1. all is good, distributed_atexit
is called first, the UCX-Py comm channels are still live, so messages can be sent between client/cluster/scheduler/workers, and everything shuts down. When those resources are released the UCX-Py comm is closed and things work.
In scenario 2. we have a problem. The first atexit handler to fire is weakref_atexit
which tears down the comm structures, then distributed_atexit
runs which wants to send messages over the comm, only it's already torn down and there's not enough state to resurrect it, so we get hangs and other errors.
Following up here because I notice that UCX tests are failing intermittently quite often on GPU CI:
https://gpuci.gpuopenanalytics.com/job/dask/job/distributed/job/prb/job/distributed-prb/6516/
I'm assuming this is related to the fact that the finalizer patch wasn't merged into main
, maybe @pentschev or @wence- can confirm?
Interested in what we want to do here in the short term - should we xfail the UCX test module (or a subset of it) for now, or is there a modification we could make to the tests to unblock this trivially?
Having done some more debugging (with @pentschev) the Comm object is being torn down on a thread other than the one that created it. T
This is interesting. I would expect all Comm
object to be created and destroyed on the main thread. Do you know which connection this is? (E.g. part of the ConnectionPool, BatchedSend, some ad-hoc connection)
(Sorry for the late reply, I was busy/out for a while)
This is interesting. I would expect all
Comm
object to be created and destroyed on the main thread. Do you know which connection this is? (E.g. part of the ConnectionPool, BatchedSend, some ad-hoc connection)
I don't recall unfortunately. I think that this is because the destructor is happening somehow by an atexit handler which can be on a different thread (it's hard to tell from the documentation/implementation what if any guarantees are given).
As mentioned in https://github.com/dask/distributed/issues/7639#issuecomment-1489013077 , we are seeing what we think is a bug due to the removal of the finalizer for a ThreadPoolExecutor . We are observing this behavior when using UCX/UCX-Py (see https://github.com/dask/distributed/issues/7639#issuecomment-1489247249 for a more detailed explanation)
Note: this MRE has no GPU/CUDA elements, only UCX/UCX-py
The above produces errors like the following:
We think https://github.com/dask/distributed/pull/7644 is the culprit here because when it is added back the errors are gone :). I confirmed the finalizer is being called with the following patch:
When using this patch we observe no errors and confirmation that the finalizer is being called: