Open rikardn opened 8 months ago
We've done some nontrivial refactoring in how we're hashing/tokenizing objects (https://github.com/dask/dask/issues/10905)
I see this came up in an OSS project. Is it possible for you to link to the code this is running?
cc @crusaderky
Certainly. The problem is that the code is quite complex and the objects we use are also quite complex. Actually running this particular workflow currently requires a proprietary program. I was hoping to be able to create a smaller example, but wasn't able to. I could potentially do some debugging in dask/distributed myself if I knew where to start looking. Could it be that scattered objects gets removed after they are deemed not needed any longer?
The main github page: https://github.com/pharmpy/pharmpy The call to scatter: https://github.com/pharmpy/pharmpy/blob/main/src/pharmpy/workflows/optimize.py The call to gather: https://github.com/pharmpy/pharmpy/blob/main/src/pharmpy/workflows/call.py Function that creates and calls the dynamic workflows: https://github.com/pharmpy/pharmpy/blob/b1323be4ae18652c9ca0efce40cde1b8237e5399/src/pharmpy/tools/ruvsearch/tool.py#L187 Creating the local cluster: https://github.com/pharmpy/pharmpy/blob/main/src/pharmpy/workflows/dispatchers/local_dask.py Executing a workflow: https://github.com/pharmpy/pharmpy/blob/main/src/pharmpy/workflows/execute.py The failed test: https://github.com/pharmpy/pharmpy/actions/runs/8260071687/job/22595069029
Could it be that scattered objects gets removed after they are deemed not needed any longer?
The lifetime of a scattered object is coupled to the Future
instances. As soon as the Future
is fully dereferenced, the cluster will release the data.
If at some point in your code you are switching from the Future
instance to it's key and only retain the key, this would explain the situation.
Hi @rikardn,
First dynamic workflow (works): Failing dynamic workflow (see first function call for the suspect scattered object):
What does "first" and "second" workflow mean? are they two keys inside the same dsk that end up summarized and retrieved at once through the results
key? Or are they two iterative calls to run
, each starting and destroying its own cluster?
In other words - could your algorithm be simplified down to this?
import distributed
with distributed.Client(processes=False) as client:
dsk = {
"results": (sum, ["first", "second"]),
"first": client.scatter(123),
"second": client.scatter(123), # produces a future with the same key as first
}
print(client.get(dsk, "results"))
or to this?
with distributed.Client(processes=False) as client:
dsk = {"results": client.scatter(123)}
print("first", client.get(dsk, "results"))
with distributed.Client(processes=False) as client:
dsk = {"results": client.scatter(123)}
print("second", client.get(dsk, "results"))
The full workflow is something like:
def func():
obj = create_object()
dsk1 = create_workflow1(client.scatter(obj))
res1 = run_dynamic_workflow(dsk1)
dsk2 = create_workflow2(client.scatter(obj), res1) # Note same object as before. New call to scatter.
res2 = run_dynamic_workflow2(dsk2)
return res2
def run_dynamic_workflow(dsk):
client = get_client()
futures = client.get(dsk, "result", sync=False)
secede()
T = client.gather(futures)
rejoin()
return res
with distributed.Client(processes=False) as client:
dsk = {
"results": (func),
}
print(client.get(dsk, "results"))
Thanks @rikardn, this helps a lot.
An important nitpick though: did you accidentally omit
def func():
client = get_client()
? In other words, do func
(which scatters) and the two calls to run_dynamic_workflow
call get_client()
three times sequentially from the same thread?
Second important nit:
dsk2 = create_workflow2(client.scatter(obj), res1)
is res1 a computed result, as your pseudocode lets intend, or a Future to the output of the first workflow?
First:
No, that was not accidental. run_dynamic_workflow
calls get_client
for each dynamically created workflow to launch. So So in this case get_client
is run twice.
Second:
Sorry, yes it is also scattered, I did't think it was important. It is only obj
that has a problem being scattered since it is the only object being scattered twice to different workflows (or at least that's what I believe is the cause).
Third:
I added a third potentially important thing. So actually we are using LocalCluster
and gets the client from it like this:
with LocalCluster(processes=False) as cluster, Client(cluster) as client:
Fourth: An other thing that I omitted is that obj is actually passed through the first dynamic workflow without change.
An updated example for reference:
def func():
obj = create_object()
dsk1 = create_workflow1(client.scatter(obj))
res1, new_obj = run_dynamic_workflow(dsk1) # Note that new_obj and obj is the same object, i.e. having the same hash
dsk2 = create_workflow2(client.scatter(new_obj), client.scatter(res1))
res2 = run_dynamic_workflow2(dsk2)
return res2
def run_dynamic_workflow(dsk):
client = get_client()
futures = client.get(dsk, "result", sync=False)
secede()
res = client.gather(futures)
rejoin()
return res
with distributed.with LocalCluster(processes=False) as cluster, Client(cluster) as client:
dsk = {
"results": (func),
}
print(client.get(dsk, "results"))
I realize my first point doesn't make sense. It is acutally the run_dynamic_workflow
that goes through the dsk
dict and scatters objects. But as my example stands there should be a call to get_client
in func
Sorry, yes it is also scattered, I did't think it was important.
It isn't. What I meant was if res is actually just the raw output of client.get, e.g. distributed.Future objects. If you're going through a gather -> scatter it's not important.
Reproduced.
I'm willing to bet that your pseudocode is missing a detail: before you're calling create_workflow2
, you're dereferencing dsk1
.
My reproducer falls apart almost immediately:
from time import sleep
import distributed
client = distributed.Client(processes=False)
while True:
print(".", end="")
x = client.scatter(123)
assert client.cluster.scheduler.tasks[x.key].state == "memory"
del x
# while client.cluster.scheduler.tasks:
# sleep(0.01)
output:
..........
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[37], line 9
7 print(".", end="")
8 x = client.scatter(123)
----> 9 assert client.cluster.scheduler.tasks[x.key].state == "memory"
10 del x
11 # while client.cluster.scheduler.tasks:
12 # sleep(0.01)
KeyError: 'int-4951b9977632a52fcd6f0cc65c57bb33'
if I uncomment the sleep after the deletion, it goes on indefinitely.
@crusaderky Just wow! I am amazed that you from my messy information could figure this one out. Thanks!
I can confirm that this issue is also in dask/distributed 2024.2.0 and the reason it was triggered for me starting with 2024.2.1 was that the hash function in dask was changed so that it now gives the same hash for the objects that are the same (which is a good change). One potential workaround is to use client.scatter(123, hash=False)
to get unique keys and avoid the race condition.
I recently encountered this error as well, I was able to get it down to a somewhat minimal example.
import numpy as np
from distributed import Client, LocalCluster, Future
import gc
import pytest
cluster = LocalCluster(
n_workers=2, threads_per_worker=2, dashboard_address=None, processes=True
)
class DataHolder():
def __init__(self, data, client, raise_error=False):
self._data_future = client.scatter(data, broadcast=True)
if raise_error:
raise ValueError("A value Error")
def do_remote_work(self):
future = client.submit(lambda x: x**2, self._data_future)
return future
data = np.arange(10)
with Client(cluster) as client:
with pytest.raises(ValueError):
DataHolder(data, client, raise_error=True)
# gc.collect() Adding an explicit garbage collection usually makes this work.
holder = DataHolder(data, client)
future_result = holder.do_remote_work()
print(future_result.result())
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
Cell In[9], line 8
6 holder = DataHolder(data, client)
7 future_result = holder.do_remote_work()
----> 8 print(future_result.result())
File ~/codes/distributed/distributed/client.py:328, in Future.result(self, timeout)
326 self._verify_initialized()
327 with shorten_traceback():
--> 328 return self.client.sync(self._result, callback_timeout=timeout)
File ~/codes/distributed/distributed/client.py:342, in Future._result(self, raiseit)
340 exception = CancelledError(self.key)
341 if raiseit:
--> 342 raise exception
343 else:
344 return exception
CancelledError: lambda-b98162b441d4883a4f78c50643cd5f64
As pointed out it's very likely due to the garbage collection on the first object happening while the second object is being created thus it deletes the key that the second item creates. This makes me think there is some odd reference counting happening somewhere, or that there should be an "atomic" action somewhere that there isn't one.
I also do not immediately see where the client.scatter
function would ever acquire the client's _refcount_lock
(as opposed to the client.submit
and client.map
functions which acquire it in client._graph_to_futures
). It would seem to me that any item that creates a Future
should be acquiring this lock at some point.
I also do not immediately see where the
This is a little hidden but it is happening while the Future
object is instantiated (see Future.bind_late)
This race condition is not related to reference counting but is caused by scatter and release using different comm channels which creates an ordering problem. Fixing this is not straight forward (https://github.com/dask/distributed/issues/7480, https://github.com/dask/distributed/pull/8430 for a suggestion for a systematic fix).
A mitigation (as already hinted to in https://github.com/dask/distributed/issues/8576#issuecomment-1994599613) is to sleep briefly after an old scatter future is being released. That's why the gc.collect
in https://github.com/dask/distributed/issues/8576#issuecomment-2121237380 helped. It didn't help due to actual garbage collection but because a couple ms
went by and the scheduler received the message that the future was released.
another mitigation that's already been mentioned is to ensure that the name of every scattered future is unique. Providing hash=False
to Client.scatter
will generate a unique key for every submission instead of hashing the data. This will possibly cause some temporary data duplication on the cluster (while the new data arrived and the old one released) but shouldn't have an impact otherwise. Whether this is a good work around depends on the application, of course.
Triage summary
If you scatter a key, dereference the returned future, and then scatter again the same key (with the same value), the release of the first future is likely to reach the scheduler after the transition to memory of the second one and you'll end up holding a Future to a forgotten task. This in turn will cause your computation to fail.
Reproducer below: https://github.com/dask/distributed/issues/8576#issuecomment-1994599613
Original post
I am using dask/distributed to run custom workflows. A workflow that used to work with versions 2024.2.0 stopped working with 2024.2.1 (and still does not work with 2024.3.0). I have been unable to create a minimal reproducible example, but I have some information that could give clues to what could be the problem.
The setup is that a static workflow, run on a
LocalCluster
with threads, is calling two dynamic workflows (see the graphs below) and many of the objects are scattered. I think that the problem is that the scattered object with idModelEntry-d3c014c28f9af6108cba2a6c960688ce
is the same for both dynamic workflows. Using distributed 2024.2.0 (or earlier) have different ids for this object even though it is the same object. Also if tellingscatter
to not usehash
the workflow will run, I guess because now the ids will be different. Not scattering the object will also work. Given the log messages (see below) it seems as if we are losing theModelEntry-d3c014c28f9af6108cba2a6c960688ce
in the second dynamic workflow and it seems to only happen when the scatter-id is the same in the two workflows.The stacktrace:
The log:
First dynamic workflow (works):
Failing dynamic workflow (see first function call for the suspect scattered object):